mirror of https://github.com/milvus-io/milvus.git
enhance: Use ChannelManger interface in Server (#29629)
See also: #29447 --------- Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/29771/head
parent
97e4ec5a69
commit
75e6b65c60
|
@ -43,7 +43,7 @@ type CompactionPlanHandlerSuite struct {
|
||||||
mockAlloc *NMockAllocator
|
mockAlloc *NMockAllocator
|
||||||
mockSch *MockScheduler
|
mockSch *MockScheduler
|
||||||
mockCm *MockChannelManager
|
mockCm *MockChannelManager
|
||||||
mockSession *MockSessionManager
|
mockSessMgr *MockSessionManager
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *CompactionPlanHandlerSuite) SetupTest() {
|
func (s *CompactionPlanHandlerSuite) SetupTest() {
|
||||||
|
@ -51,7 +51,7 @@ func (s *CompactionPlanHandlerSuite) SetupTest() {
|
||||||
s.mockAlloc = NewNMockAllocator(s.T())
|
s.mockAlloc = NewNMockAllocator(s.T())
|
||||||
s.mockSch = NewMockScheduler(s.T())
|
s.mockSch = NewMockScheduler(s.T())
|
||||||
s.mockCm = NewMockChannelManager(s.T())
|
s.mockCm = NewMockChannelManager(s.T())
|
||||||
s.mockSession = NewMockSessionManager(s.T())
|
s.mockSessMgr = NewMockSessionManager(s.T())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() {
|
func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() {
|
||||||
|
@ -76,7 +76,7 @@ func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *CompactionPlanHandlerSuite) TestCheckResult() {
|
func (s *CompactionPlanHandlerSuite) TestCheckResult() {
|
||||||
s.mockSession.EXPECT().GetCompactionPlansResults().Return(map[int64]*datapb.CompactionPlanResult{
|
s.mockSessMgr.EXPECT().GetCompactionPlansResults().Return(map[int64]*datapb.CompactionPlanResult{
|
||||||
1: {PlanID: 1, State: commonpb.CompactionState_Executing},
|
1: {PlanID: 1, State: commonpb.CompactionState_Executing},
|
||||||
2: {PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}},
|
2: {PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}},
|
||||||
3: {PlanID: 3, State: commonpb.CompactionState_Executing},
|
3: {PlanID: 3, State: commonpb.CompactionState_Executing},
|
||||||
|
@ -84,13 +84,13 @@ func (s *CompactionPlanHandlerSuite) TestCheckResult() {
|
||||||
})
|
})
|
||||||
{
|
{
|
||||||
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once()
|
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once()
|
||||||
handler := newCompactionPlanHandler(s.mockSession, nil, nil, s.mockAlloc)
|
handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc)
|
||||||
handler.checkResult()
|
handler.checkResult()
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(19530, nil).Once()
|
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(19530, nil).Once()
|
||||||
handler := newCompactionPlanHandler(s.mockSession, nil, nil, s.mockAlloc)
|
handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc)
|
||||||
handler.checkResult()
|
handler.checkResult()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -277,7 +277,7 @@ func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
|
||||||
{"channel with no error", "ch-2", false},
|
{"channel with no error", "ch-2", false},
|
||||||
}
|
}
|
||||||
|
|
||||||
handler := newCompactionPlanHandler(nil, s.mockCm, s.mockMeta, s.mockAlloc)
|
handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
|
||||||
handler.scheduler = s.mockSch
|
handler.scheduler = s.mockSch
|
||||||
|
|
||||||
for idx, test := range tests {
|
for idx, test := range tests {
|
||||||
|
@ -310,7 +310,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
|
||||||
|
|
||||||
s.Run("illegal nil result", func() {
|
s.Run("illegal nil result", func() {
|
||||||
s.SetupTest()
|
s.SetupTest()
|
||||||
handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc)
|
handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
|
||||||
err := handler.handleMergeCompactionResult(nil, nil)
|
err := handler.handleMergeCompactionResult(nil, nil)
|
||||||
s.Error(err)
|
s.Error(err)
|
||||||
})
|
})
|
||||||
|
@ -324,9 +324,9 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}).Once()
|
}).Once()
|
||||||
s.mockSession.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()
|
s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()
|
||||||
|
|
||||||
handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc)
|
handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
|
||||||
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
|
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
|
||||||
|
|
||||||
compactionResult := &datapb.CompactionPlanResult{
|
compactionResult := &datapb.CompactionPlanResult{
|
||||||
|
@ -345,7 +345,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
|
||||||
s.mockMeta.EXPECT().PrepareCompleteCompactionMutation(mock.Anything, mock.Anything).Return(
|
s.mockMeta.EXPECT().PrepareCompleteCompactionMutation(mock.Anything, mock.Anything).Return(
|
||||||
nil, nil, nil, errors.New("mock error")).Once()
|
nil, nil, nil, errors.New("mock error")).Once()
|
||||||
|
|
||||||
handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc)
|
handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
|
||||||
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
|
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
|
||||||
compactionResult := &datapb.CompactionPlanResult{
|
compactionResult := &datapb.CompactionPlanResult{
|
||||||
PlanID: plan.PlanID,
|
PlanID: plan.PlanID,
|
||||||
|
@ -368,7 +368,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
|
||||||
s.mockMeta.EXPECT().alterMetaStoreAfterCompaction(mock.Anything, mock.Anything).
|
s.mockMeta.EXPECT().alterMetaStoreAfterCompaction(mock.Anything, mock.Anything).
|
||||||
Return(errors.New("mock error")).Once()
|
Return(errors.New("mock error")).Once()
|
||||||
|
|
||||||
handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc)
|
handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
|
||||||
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
|
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
|
||||||
compactionResult := &datapb.CompactionPlanResult{
|
compactionResult := &datapb.CompactionPlanResult{
|
||||||
PlanID: plan.PlanID,
|
PlanID: plan.PlanID,
|
||||||
|
@ -390,9 +390,9 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
|
||||||
&segMetricMutation{}, nil).Once()
|
&segMetricMutation{}, nil).Once()
|
||||||
s.mockMeta.EXPECT().alterMetaStoreAfterCompaction(mock.Anything, mock.Anything).
|
s.mockMeta.EXPECT().alterMetaStoreAfterCompaction(mock.Anything, mock.Anything).
|
||||||
Return(nil).Once()
|
Return(nil).Once()
|
||||||
s.mockSession.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once()
|
s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once()
|
||||||
|
|
||||||
handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc)
|
handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
|
||||||
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
|
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
|
||||||
compactionResult := &datapb.CompactionPlanResult{
|
compactionResult := &datapb.CompactionPlanResult{
|
||||||
PlanID: plan.PlanID,
|
PlanID: plan.PlanID,
|
||||||
|
@ -422,7 +422,7 @@ func (s *CompactionPlanHandlerSuite) TestCompleteCompaction() {
|
||||||
})
|
})
|
||||||
|
|
||||||
s.Run("test complete merge compaction task", func() {
|
s.Run("test complete merge compaction task", func() {
|
||||||
s.mockSession.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()
|
s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()
|
||||||
// mock for handleMergeCompactionResult
|
// mock for handleMergeCompactionResult
|
||||||
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once()
|
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once()
|
||||||
s.mockMeta.EXPECT().PrepareCompleteCompactionMutation(mock.Anything, mock.Anything).Return(
|
s.mockMeta.EXPECT().PrepareCompleteCompactionMutation(mock.Anything, mock.Anything).Return(
|
||||||
|
@ -490,7 +490,7 @@ func (s *CompactionPlanHandlerSuite) TestCompleteCompaction() {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
c := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc)
|
c := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
|
||||||
c.scheduler = s.mockSch
|
c.scheduler = s.mockSch
|
||||||
c.plans = plans
|
c.plans = plans
|
||||||
|
|
||||||
|
@ -536,7 +536,7 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
|
func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
|
||||||
s.mockSession.EXPECT().GetCompactionPlansResults().Return(map[int64]*datapb.CompactionPlanResult{
|
s.mockSessMgr.EXPECT().GetCompactionPlansResults().Return(map[int64]*datapb.CompactionPlanResult{
|
||||||
1: {PlanID: 1, State: commonpb.CompactionState_Executing},
|
1: {PlanID: 1, State: commonpb.CompactionState_Executing},
|
||||||
2: {PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}},
|
2: {PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}},
|
||||||
3: {PlanID: 3, State: commonpb.CompactionState_Executing},
|
3: {PlanID: 3, State: commonpb.CompactionState_Executing},
|
||||||
|
@ -565,7 +565,7 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc)
|
handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
|
||||||
handler.plans = inPlans
|
handler.plans = inPlans
|
||||||
|
|
||||||
err := handler.updateCompaction(0)
|
err := handler.updateCompaction(0)
|
||||||
|
|
|
@ -117,7 +117,7 @@ type Server struct {
|
||||||
allocator allocator
|
allocator allocator
|
||||||
cluster Cluster
|
cluster Cluster
|
||||||
sessionManager SessionManager
|
sessionManager SessionManager
|
||||||
channelManager *ChannelManagerImpl
|
channelManager ChannelManager
|
||||||
rootCoordClient types.RootCoordClient
|
rootCoordClient types.RootCoordClient
|
||||||
garbageCollector *garbageCollector
|
garbageCollector *garbageCollector
|
||||||
gcOpt GcOption
|
gcOpt GcOption
|
||||||
|
|
|
@ -37,8 +37,6 @@ import (
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
|
||||||
grpcStatus "google.golang.org/grpc/status"
|
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||||
|
@ -95,135 +93,6 @@ func TestMain(m *testing.M) {
|
||||||
os.Exit(code)
|
os.Exit(code)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetSegmentInfoChannel(t *testing.T) {
|
|
||||||
svr := newTestServer(t, nil)
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
t.Run("get segment info channel", func(t *testing.T) {
|
|
||||||
resp, err := svr.GetSegmentInfoChannel(context.TODO(), nil)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
||||||
assert.EqualValues(t, Params.CommonCfg.DataCoordSegmentInfo.GetValue(), resp.Value)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAssignSegmentID(t *testing.T) {
|
|
||||||
const collID = 100
|
|
||||||
const collIDInvalid = 101
|
|
||||||
const partID = 0
|
|
||||||
const channel0 = "channel0"
|
|
||||||
|
|
||||||
t.Run("assign segment normally", func(t *testing.T) {
|
|
||||||
svr := newTestServer(t, nil)
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
schema := newTestSchema()
|
|
||||||
svr.meta.AddCollection(&collectionInfo{
|
|
||||||
ID: collID,
|
|
||||||
Schema: schema,
|
|
||||||
Partitions: []int64{},
|
|
||||||
})
|
|
||||||
req := &datapb.SegmentIDRequest{
|
|
||||||
Count: 1000,
|
|
||||||
ChannelName: channel0,
|
|
||||||
CollectionID: collID,
|
|
||||||
PartitionID: partID,
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := svr.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{
|
|
||||||
NodeID: 0,
|
|
||||||
PeerRole: "",
|
|
||||||
SegmentIDRequests: []*datapb.SegmentIDRequest{req},
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, 1, len(resp.SegIDAssignments))
|
|
||||||
assign := resp.SegIDAssignments[0]
|
|
||||||
assert.EqualValues(t, commonpb.ErrorCode_Success, assign.GetStatus().GetErrorCode())
|
|
||||||
assert.EqualValues(t, collID, assign.CollectionID)
|
|
||||||
assert.EqualValues(t, partID, assign.PartitionID)
|
|
||||||
assert.EqualValues(t, channel0, assign.ChannelName)
|
|
||||||
assert.EqualValues(t, 1000, assign.Count)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("assign segment for bulkload", func(t *testing.T) {
|
|
||||||
svr := newTestServer(t, nil)
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
schema := newTestSchema()
|
|
||||||
svr.meta.AddCollection(&collectionInfo{
|
|
||||||
ID: collID,
|
|
||||||
Schema: schema,
|
|
||||||
Partitions: []int64{},
|
|
||||||
})
|
|
||||||
req := &datapb.SegmentIDRequest{
|
|
||||||
Count: 1000,
|
|
||||||
ChannelName: channel0,
|
|
||||||
CollectionID: collID,
|
|
||||||
PartitionID: partID,
|
|
||||||
IsImport: true,
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := svr.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{
|
|
||||||
NodeID: 0,
|
|
||||||
PeerRole: "",
|
|
||||||
SegmentIDRequests: []*datapb.SegmentIDRequest{req},
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, 1, len(resp.SegIDAssignments))
|
|
||||||
assign := resp.SegIDAssignments[0]
|
|
||||||
assert.EqualValues(t, commonpb.ErrorCode_Success, assign.GetStatus().GetErrorCode())
|
|
||||||
assert.EqualValues(t, collID, assign.CollectionID)
|
|
||||||
assert.EqualValues(t, partID, assign.PartitionID)
|
|
||||||
assert.EqualValues(t, channel0, assign.ChannelName)
|
|
||||||
assert.EqualValues(t, 1000, assign.Count)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("with closed server", func(t *testing.T) {
|
|
||||||
req := &datapb.SegmentIDRequest{
|
|
||||||
Count: 100,
|
|
||||||
ChannelName: channel0,
|
|
||||||
CollectionID: collID,
|
|
||||||
PartitionID: partID,
|
|
||||||
}
|
|
||||||
svr := newTestServer(t, nil)
|
|
||||||
closeTestServer(t, svr)
|
|
||||||
resp, err := svr.AssignSegmentID(context.Background(), &datapb.AssignSegmentIDRequest{
|
|
||||||
NodeID: 0,
|
|
||||||
PeerRole: "",
|
|
||||||
SegmentIDRequests: []*datapb.SegmentIDRequest{req},
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("assign segment with invalid collection", func(t *testing.T) {
|
|
||||||
svr := newTestServer(t, nil)
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
svr.rootCoordClient = &mockRootCoord{
|
|
||||||
RootCoordClient: svr.rootCoordClient,
|
|
||||||
collID: collID,
|
|
||||||
}
|
|
||||||
|
|
||||||
schema := newTestSchema()
|
|
||||||
svr.meta.AddCollection(&collectionInfo{
|
|
||||||
ID: collID,
|
|
||||||
Schema: schema,
|
|
||||||
Partitions: []int64{},
|
|
||||||
})
|
|
||||||
req := &datapb.SegmentIDRequest{
|
|
||||||
Count: 1000,
|
|
||||||
ChannelName: channel0,
|
|
||||||
CollectionID: collIDInvalid,
|
|
||||||
PartitionID: partID,
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := svr.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{
|
|
||||||
NodeID: 0,
|
|
||||||
PeerRole: "",
|
|
||||||
SegmentIDRequests: []*datapb.SegmentIDRequest{req},
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, 0, len(resp.SegIDAssignments))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
type mockRootCoord struct {
|
type mockRootCoord struct {
|
||||||
types.RootCoordClient
|
types.RootCoordClient
|
||||||
collID UniqueID
|
collID UniqueID
|
||||||
|
@ -248,126 +117,6 @@ func (r *mockRootCoord) ReportImport(ctx context.Context, req *rootcoordpb.Impor
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFlush(t *testing.T) {
|
|
||||||
req := &datapb.FlushRequest{
|
|
||||||
Base: &commonpb.MsgBase{
|
|
||||||
MsgType: commonpb.MsgType_Flush,
|
|
||||||
MsgID: 0,
|
|
||||||
Timestamp: 0,
|
|
||||||
SourceID: 0,
|
|
||||||
},
|
|
||||||
DbID: 0,
|
|
||||||
CollectionID: 0,
|
|
||||||
}
|
|
||||||
t.Run("normal case", func(t *testing.T) {
|
|
||||||
svr := newTestServer(t, nil)
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
schema := newTestSchema()
|
|
||||||
svr.meta.AddCollection(&collectionInfo{ID: 0, Schema: schema, Partitions: []int64{}})
|
|
||||||
allocations, err := svr.segmentManager.AllocSegment(context.TODO(), 0, 1, "channel-1", 1)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, 1, len(allocations))
|
|
||||||
expireTs := allocations[0].ExpireTime
|
|
||||||
segID := allocations[0].SegmentID
|
|
||||||
|
|
||||||
resp, err := svr.Flush(context.TODO(), req)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
||||||
|
|
||||||
svr.meta.SetCurrentRows(segID, 1)
|
|
||||||
ids, err := svr.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, 1, len(ids))
|
|
||||||
assert.EqualValues(t, segID, ids[0])
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("bulkload segment", func(t *testing.T) {
|
|
||||||
svr := newTestServer(t, nil)
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
schema := newTestSchema()
|
|
||||||
svr.meta.AddCollection(&collectionInfo{ID: 0, Schema: schema, Partitions: []int64{}})
|
|
||||||
|
|
||||||
allocations, err := svr.segmentManager.allocSegmentForImport(context.TODO(), 0, 1, "channel-1", 1, 100)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
expireTs := allocations.ExpireTime
|
|
||||||
segID := allocations.SegmentID
|
|
||||||
|
|
||||||
resp, err := svr.Flush(context.TODO(), req)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
||||||
assert.EqualValues(t, 0, len(resp.SegmentIDs))
|
|
||||||
// should not flush anything since this is a normal flush
|
|
||||||
svr.meta.SetCurrentRows(segID, 1)
|
|
||||||
ids, err := svr.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, 0, len(ids))
|
|
||||||
|
|
||||||
req := &datapb.FlushRequest{
|
|
||||||
Base: &commonpb.MsgBase{
|
|
||||||
MsgType: commonpb.MsgType_Flush,
|
|
||||||
MsgID: 0,
|
|
||||||
Timestamp: 0,
|
|
||||||
SourceID: 0,
|
|
||||||
},
|
|
||||||
DbID: 0,
|
|
||||||
CollectionID: 0,
|
|
||||||
IsImport: true,
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err = svr.Flush(context.TODO(), req)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
||||||
assert.EqualValues(t, 1, len(resp.SegmentIDs))
|
|
||||||
|
|
||||||
ids, err = svr.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, 1, len(ids))
|
|
||||||
assert.EqualValues(t, segID, ids[0])
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("closed server", func(t *testing.T) {
|
|
||||||
svr := newTestServer(t, nil)
|
|
||||||
closeTestServer(t, svr)
|
|
||||||
resp, err := svr.Flush(context.Background(), req)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("test rolling upgrade", func(t *testing.T) {
|
|
||||||
svr := newTestServer(t, nil)
|
|
||||||
closeTestServer(t, svr)
|
|
||||||
svr.stateCode.Store(commonpb.StateCode_Healthy)
|
|
||||||
sm := NewSessionManagerImpl()
|
|
||||||
|
|
||||||
datanodeClient := mocks.NewMockDataNodeClient(t)
|
|
||||||
datanodeClient.EXPECT().FlushChannels(mock.Anything, mock.Anything).Return(nil,
|
|
||||||
merr.WrapErrServiceUnimplemented(grpcStatus.Error(codes.Unimplemented, "mock grpc unimplemented error")))
|
|
||||||
|
|
||||||
sm.sessions = struct {
|
|
||||||
sync.RWMutex
|
|
||||||
data map[int64]*Session
|
|
||||||
}{data: map[int64]*Session{1: {
|
|
||||||
client: datanodeClient,
|
|
||||||
clientCreator: func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
|
|
||||||
return datanodeClient, nil
|
|
||||||
},
|
|
||||||
}}}
|
|
||||||
|
|
||||||
svr.sessionManager = sm
|
|
||||||
svr.cluster = NewClusterImpl(sm, svr.channelManager)
|
|
||||||
|
|
||||||
err := svr.channelManager.AddNode(1)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
err = svr.channelManager.Watch(context.TODO(), &channelMeta{Name: "ch1", CollectionID: 0})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
resp, err := svr.Flush(context.TODO(), req)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
|
||||||
assert.Equal(t, Timestamp(0), resp.GetFlushTs())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// func TestGetComponentStates(t *testing.T) {
|
// func TestGetComponentStates(t *testing.T) {
|
||||||
// svr := newTestServer(t)
|
// svr := newTestServer(t)
|
||||||
// defer closeTestServer(t, svr)
|
// defer closeTestServer(t, svr)
|
||||||
|
@ -1280,435 +1029,6 @@ func (s *spySegmentManager) DropSegmentsOfChannel(ctx context.Context, channel s
|
||||||
s.spyCh <- struct{}{}
|
s.spyCh <- struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSaveBinlogPaths(t *testing.T) {
|
|
||||||
t.Run("Normal SaveRequest", func(t *testing.T) {
|
|
||||||
svr := newTestServer(t, nil)
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
|
|
||||||
// vecFieldID := int64(201)
|
|
||||||
svr.meta.AddCollection(&collectionInfo{
|
|
||||||
ID: 0,
|
|
||||||
})
|
|
||||||
|
|
||||||
segments := []struct {
|
|
||||||
id UniqueID
|
|
||||||
collectionID UniqueID
|
|
||||||
}{
|
|
||||||
{0, 0},
|
|
||||||
{1, 0},
|
|
||||||
}
|
|
||||||
for _, segment := range segments {
|
|
||||||
s := &datapb.SegmentInfo{
|
|
||||||
ID: segment.id,
|
|
||||||
CollectionID: segment.collectionID,
|
|
||||||
InsertChannel: "ch1",
|
|
||||||
State: commonpb.SegmentState_Growing,
|
|
||||||
}
|
|
||||||
err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s))
|
|
||||||
assert.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
err := svr.channelManager.AddNode(0)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
err = svr.channelManager.Watch(ctx, &channelMeta{Name: "ch1", CollectionID: 0})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
|
|
||||||
Base: &commonpb.MsgBase{
|
|
||||||
Timestamp: uint64(time.Now().Unix()),
|
|
||||||
},
|
|
||||||
SegmentID: 1,
|
|
||||||
CollectionID: 0,
|
|
||||||
Channel: "ch1",
|
|
||||||
Field2BinlogPaths: []*datapb.FieldBinlog{
|
|
||||||
{
|
|
||||||
FieldID: 1,
|
|
||||||
Binlogs: []*datapb.Binlog{
|
|
||||||
{
|
|
||||||
LogPath: "/by-dev/test/0/1/1/1/Allo1",
|
|
||||||
EntriesNum: 5,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
LogPath: "/by-dev/test/0/1/1/1/Allo2",
|
|
||||||
EntriesNum: 5,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Field2StatslogPaths: []*datapb.FieldBinlog{
|
|
||||||
{
|
|
||||||
FieldID: 1,
|
|
||||||
Binlogs: []*datapb.Binlog{
|
|
||||||
{
|
|
||||||
LogPath: "/by-dev/test_stats/0/1/1/1/Allo1",
|
|
||||||
EntriesNum: 5,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
LogPath: "/by-dev/test_stats/0/1/1/1/Allo2",
|
|
||||||
EntriesNum: 5,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
CheckPoints: []*datapb.CheckPoint{
|
|
||||||
{
|
|
||||||
SegmentID: 1,
|
|
||||||
Position: &msgpb.MsgPosition{
|
|
||||||
ChannelName: "ch1",
|
|
||||||
MsgID: []byte{1, 2, 3},
|
|
||||||
MsgGroup: "",
|
|
||||||
Timestamp: 0,
|
|
||||||
},
|
|
||||||
NumOfRows: 12,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Flushed: false,
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_Success)
|
|
||||||
|
|
||||||
segment := svr.meta.GetHealthySegment(1)
|
|
||||||
assert.NotNil(t, segment)
|
|
||||||
binlogs := segment.GetBinlogs()
|
|
||||||
assert.EqualValues(t, 1, len(binlogs))
|
|
||||||
fieldBinlogs := binlogs[0]
|
|
||||||
assert.NotNil(t, fieldBinlogs)
|
|
||||||
assert.EqualValues(t, 2, len(fieldBinlogs.GetBinlogs()))
|
|
||||||
assert.EqualValues(t, 1, fieldBinlogs.GetFieldID())
|
|
||||||
assert.EqualValues(t, "/by-dev/test/0/1/1/1/Allo1", fieldBinlogs.GetBinlogs()[0].GetLogPath())
|
|
||||||
assert.EqualValues(t, "/by-dev/test/0/1/1/1/Allo2", fieldBinlogs.GetBinlogs()[1].GetLogPath())
|
|
||||||
|
|
||||||
assert.EqualValues(t, segment.DmlPosition.ChannelName, "ch1")
|
|
||||||
assert.EqualValues(t, segment.DmlPosition.MsgID, []byte{1, 2, 3})
|
|
||||||
assert.EqualValues(t, segment.NumOfRows, 10)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("Normal L0 SaveRequest", func(t *testing.T) {
|
|
||||||
svr := newTestServer(t, nil)
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
|
|
||||||
// vecFieldID := int64(201)
|
|
||||||
svr.meta.AddCollection(&collectionInfo{
|
|
||||||
ID: 0,
|
|
||||||
})
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
err := svr.channelManager.AddNode(0)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
err = svr.channelManager.Watch(ctx, &channelMeta{Name: "ch1", CollectionID: 0})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
|
|
||||||
Base: &commonpb.MsgBase{
|
|
||||||
Timestamp: uint64(time.Now().Unix()),
|
|
||||||
},
|
|
||||||
SegmentID: 1,
|
|
||||||
PartitionID: 1,
|
|
||||||
CollectionID: 0,
|
|
||||||
SegLevel: datapb.SegmentLevel_L0,
|
|
||||||
Deltalogs: []*datapb.FieldBinlog{
|
|
||||||
{
|
|
||||||
FieldID: 1,
|
|
||||||
Binlogs: []*datapb.Binlog{
|
|
||||||
{
|
|
||||||
LogPath: "/by-dev/test/0/1/1/1/Allo1",
|
|
||||||
EntriesNum: 5,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
LogPath: "/by-dev/test/0/1/1/1/Allo2",
|
|
||||||
EntriesNum: 5,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
CheckPoints: []*datapb.CheckPoint{
|
|
||||||
{
|
|
||||||
SegmentID: 1,
|
|
||||||
Position: &msgpb.MsgPosition{
|
|
||||||
ChannelName: "ch1",
|
|
||||||
MsgID: []byte{1, 2, 3},
|
|
||||||
MsgGroup: "",
|
|
||||||
Timestamp: 0,
|
|
||||||
},
|
|
||||||
NumOfRows: 12,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Flushed: true,
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_Success)
|
|
||||||
|
|
||||||
segment := svr.meta.GetHealthySegment(1)
|
|
||||||
assert.NotNil(t, segment)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("SaveDroppedSegment", func(t *testing.T) {
|
|
||||||
svr := newTestServer(t, nil)
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
|
|
||||||
// vecFieldID := int64(201)
|
|
||||||
svr.meta.AddCollection(&collectionInfo{
|
|
||||||
ID: 0,
|
|
||||||
})
|
|
||||||
|
|
||||||
segments := []struct {
|
|
||||||
id UniqueID
|
|
||||||
collectionID UniqueID
|
|
||||||
}{
|
|
||||||
{0, 0},
|
|
||||||
{1, 0},
|
|
||||||
}
|
|
||||||
for _, segment := range segments {
|
|
||||||
s := &datapb.SegmentInfo{
|
|
||||||
ID: segment.id,
|
|
||||||
CollectionID: segment.collectionID,
|
|
||||||
InsertChannel: "ch1",
|
|
||||||
State: commonpb.SegmentState_Dropped,
|
|
||||||
}
|
|
||||||
err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s))
|
|
||||||
assert.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
err := svr.channelManager.AddNode(0)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
err = svr.channelManager.Watch(ctx, &channelMeta{Name: "ch1", CollectionID: 0})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
|
|
||||||
Base: &commonpb.MsgBase{
|
|
||||||
Timestamp: uint64(time.Now().Unix()),
|
|
||||||
},
|
|
||||||
SegmentID: 1,
|
|
||||||
CollectionID: 0,
|
|
||||||
Field2BinlogPaths: []*datapb.FieldBinlog{
|
|
||||||
{
|
|
||||||
FieldID: 1,
|
|
||||||
Binlogs: []*datapb.Binlog{
|
|
||||||
{
|
|
||||||
LogPath: "/by-dev/test/0/1/1/1/Allo1",
|
|
||||||
EntriesNum: 5,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
LogPath: "/by-dev/test/0/1/1/1/Allo2",
|
|
||||||
EntriesNum: 5,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
CheckPoints: []*datapb.CheckPoint{
|
|
||||||
{
|
|
||||||
SegmentID: 1,
|
|
||||||
Position: &msgpb.MsgPosition{
|
|
||||||
ChannelName: "ch1",
|
|
||||||
MsgID: []byte{1, 2, 3},
|
|
||||||
MsgGroup: "",
|
|
||||||
Timestamp: 0,
|
|
||||||
},
|
|
||||||
NumOfRows: 12,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Flushed: false,
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_Success)
|
|
||||||
|
|
||||||
segment := svr.meta.GetSegment(1)
|
|
||||||
assert.NotNil(t, segment)
|
|
||||||
binlogs := segment.GetBinlogs()
|
|
||||||
assert.EqualValues(t, 0, len(binlogs))
|
|
||||||
assert.EqualValues(t, segment.NumOfRows, 0)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("SaveUnhealthySegment", func(t *testing.T) {
|
|
||||||
svr := newTestServer(t, nil)
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
|
|
||||||
// vecFieldID := int64(201)
|
|
||||||
svr.meta.AddCollection(&collectionInfo{
|
|
||||||
ID: 0,
|
|
||||||
})
|
|
||||||
|
|
||||||
segments := []struct {
|
|
||||||
id UniqueID
|
|
||||||
collectionID UniqueID
|
|
||||||
}{
|
|
||||||
{0, 0},
|
|
||||||
{1, 0},
|
|
||||||
}
|
|
||||||
for _, segment := range segments {
|
|
||||||
s := &datapb.SegmentInfo{
|
|
||||||
ID: segment.id,
|
|
||||||
CollectionID: segment.collectionID,
|
|
||||||
InsertChannel: "ch1",
|
|
||||||
State: commonpb.SegmentState_NotExist,
|
|
||||||
}
|
|
||||||
err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s))
|
|
||||||
assert.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
err := svr.channelManager.AddNode(0)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
err = svr.channelManager.Watch(ctx, &channelMeta{Name: "ch1", CollectionID: 0})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
|
|
||||||
Base: &commonpb.MsgBase{
|
|
||||||
Timestamp: uint64(time.Now().Unix()),
|
|
||||||
},
|
|
||||||
SegmentID: 1,
|
|
||||||
CollectionID: 0,
|
|
||||||
Field2BinlogPaths: []*datapb.FieldBinlog{
|
|
||||||
{
|
|
||||||
FieldID: 1,
|
|
||||||
Binlogs: []*datapb.Binlog{
|
|
||||||
{
|
|
||||||
LogPath: "/by-dev/test/0/1/1/1/Allo1",
|
|
||||||
EntriesNum: 5,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
LogPath: "/by-dev/test/0/1/1/1/Allo2",
|
|
||||||
EntriesNum: 5,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
CheckPoints: []*datapb.CheckPoint{
|
|
||||||
{
|
|
||||||
SegmentID: 1,
|
|
||||||
Position: &msgpb.MsgPosition{
|
|
||||||
ChannelName: "ch1",
|
|
||||||
MsgID: []byte{1, 2, 3},
|
|
||||||
MsgGroup: "",
|
|
||||||
Timestamp: 0,
|
|
||||||
},
|
|
||||||
NumOfRows: 12,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Flushed: false,
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.ErrorIs(t, merr.Error(resp), merr.ErrSegmentNotFound)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("SaveNotExistSegment", func(t *testing.T) {
|
|
||||||
svr := newTestServer(t, nil)
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
|
|
||||||
// vecFieldID := int64(201)
|
|
||||||
svr.meta.AddCollection(&collectionInfo{
|
|
||||||
ID: 0,
|
|
||||||
})
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
err := svr.channelManager.AddNode(0)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
err = svr.channelManager.Watch(ctx, &channelMeta{Name: "ch1", CollectionID: 0})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
|
|
||||||
Base: &commonpb.MsgBase{
|
|
||||||
Timestamp: uint64(time.Now().Unix()),
|
|
||||||
},
|
|
||||||
SegmentID: 1,
|
|
||||||
CollectionID: 0,
|
|
||||||
Field2BinlogPaths: []*datapb.FieldBinlog{
|
|
||||||
{
|
|
||||||
FieldID: 1,
|
|
||||||
Binlogs: []*datapb.Binlog{
|
|
||||||
{
|
|
||||||
LogPath: "/by-dev/test/0/1/1/1/Allo1",
|
|
||||||
EntriesNum: 5,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
LogPath: "/by-dev/test/0/1/1/1/Allo2",
|
|
||||||
EntriesNum: 5,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
CheckPoints: []*datapb.CheckPoint{
|
|
||||||
{
|
|
||||||
SegmentID: 1,
|
|
||||||
Position: &msgpb.MsgPosition{
|
|
||||||
ChannelName: "ch1",
|
|
||||||
MsgID: []byte{1, 2, 3},
|
|
||||||
MsgGroup: "",
|
|
||||||
Timestamp: 0,
|
|
||||||
},
|
|
||||||
NumOfRows: 12,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Flushed: false,
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.ErrorIs(t, merr.Error(resp), merr.ErrSegmentNotFound)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("with channel not matched", func(t *testing.T) {
|
|
||||||
svr := newTestServer(t, nil)
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
err := svr.channelManager.AddNode(0)
|
|
||||||
require.Nil(t, err)
|
|
||||||
err = svr.channelManager.Watch(context.TODO(), &channelMeta{Name: "ch1", CollectionID: 0})
|
|
||||||
require.Nil(t, err)
|
|
||||||
s := &datapb.SegmentInfo{
|
|
||||||
ID: 1,
|
|
||||||
InsertChannel: "ch2",
|
|
||||||
State: commonpb.SegmentState_Growing,
|
|
||||||
}
|
|
||||||
svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s))
|
|
||||||
|
|
||||||
resp, err := svr.SaveBinlogPaths(context.Background(), &datapb.SaveBinlogPathsRequest{
|
|
||||||
SegmentID: 1,
|
|
||||||
Channel: "test",
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.ErrorIs(t, merr.Error(resp), merr.ErrChannelNotFound)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("with closed server", func(t *testing.T) {
|
|
||||||
svr := newTestServer(t, nil)
|
|
||||||
closeTestServer(t, svr)
|
|
||||||
resp, err := svr.SaveBinlogPaths(context.Background(), &datapb.SaveBinlogPathsRequest{})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.ErrorIs(t, merr.Error(resp), merr.ErrServiceNotReady)
|
|
||||||
})
|
|
||||||
/*
|
|
||||||
t.Run("test save dropped segment and remove channel", func(t *testing.T) {
|
|
||||||
spyCh := make(chan struct{}, 1)
|
|
||||||
svr := newTestServer(t, nil, WithSegmentManager(&spySegmentManager{spyCh: spyCh}))
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
|
|
||||||
svr.meta.AddCollection(&collectionInfo{ID: 1})
|
|
||||||
err := svr.meta.AddSegment(&SegmentInfo{
|
|
||||||
Segment: &datapb.SegmentInfo{
|
|
||||||
ID: 1,
|
|
||||||
CollectionID: 1,
|
|
||||||
InsertChannel: "ch1",
|
|
||||||
State: commonpb.SegmentState_Growing,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
err = svr.channelManager.AddNode(0)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 1})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
_, err = svr.SaveBinlogPaths(context.TODO(), &datapb.SaveBinlogPathsRequest{
|
|
||||||
SegmentID: 1,
|
|
||||||
Dropped: true,
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
<-spyCh
|
|
||||||
})*/
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDropVirtualChannel(t *testing.T) {
|
func TestDropVirtualChannel(t *testing.T) {
|
||||||
t.Run("normal DropVirtualChannel", func(t *testing.T) {
|
t.Run("normal DropVirtualChannel", func(t *testing.T) {
|
||||||
spyCh := make(chan struct{}, 1)
|
spyCh := make(chan struct{}, 1)
|
||||||
|
@ -3427,243 +2747,6 @@ func TestPostFlush(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetFlushState(t *testing.T) {
|
|
||||||
t.Run("get flush state with all flushed segments", func(t *testing.T) {
|
|
||||||
meta, err := newMemoryMeta()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
svr := newTestServerWithMeta(t, nil, meta)
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
|
|
||||||
err = meta.AddSegment(context.TODO(), &SegmentInfo{
|
|
||||||
SegmentInfo: &datapb.SegmentInfo{
|
|
||||||
ID: 1,
|
|
||||||
State: commonpb.SegmentState_Flushed,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
err = meta.AddSegment(context.TODO(), &SegmentInfo{
|
|
||||||
SegmentInfo: &datapb.SegmentInfo{
|
|
||||||
ID: 2,
|
|
||||||
State: commonpb.SegmentState_Flushed,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
var (
|
|
||||||
vchannel = "ch1"
|
|
||||||
collection = int64(0)
|
|
||||||
)
|
|
||||||
|
|
||||||
svr.channelManager = &ChannelManagerImpl{
|
|
||||||
store: &ChannelStore{
|
|
||||||
channelsInfo: map[int64]*NodeChannelInfo{
|
|
||||||
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
err = svr.meta.UpdateChannelCheckpoint(vchannel, &msgpb.MsgPosition{
|
|
||||||
MsgID: []byte{1},
|
|
||||||
Timestamp: 12,
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
resp, err := svr.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
|
|
||||||
Status: merr.Success(),
|
|
||||||
Flushed: true,
|
|
||||||
}, resp)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("get flush state with unflushed segments", func(t *testing.T) {
|
|
||||||
meta, err := newMemoryMeta()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
svr := newTestServerWithMeta(t, nil, meta)
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
|
|
||||||
err = meta.AddSegment(context.TODO(), &SegmentInfo{
|
|
||||||
SegmentInfo: &datapb.SegmentInfo{
|
|
||||||
ID: 1,
|
|
||||||
State: commonpb.SegmentState_Flushed,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
err = meta.AddSegment(context.TODO(), &SegmentInfo{
|
|
||||||
SegmentInfo: &datapb.SegmentInfo{
|
|
||||||
ID: 2,
|
|
||||||
State: commonpb.SegmentState_Sealed,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
var (
|
|
||||||
vchannel = "ch1"
|
|
||||||
collection = int64(0)
|
|
||||||
)
|
|
||||||
|
|
||||||
svr.channelManager = &ChannelManagerImpl{
|
|
||||||
store: &ChannelStore{
|
|
||||||
channelsInfo: map[int64]*NodeChannelInfo{
|
|
||||||
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
err = svr.meta.UpdateChannelCheckpoint(vchannel, &msgpb.MsgPosition{
|
|
||||||
MsgID: []byte{1},
|
|
||||||
Timestamp: 12,
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
resp, err := svr.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
|
|
||||||
Status: merr.Success(),
|
|
||||||
Flushed: false,
|
|
||||||
}, resp)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("get flush state with compacted segments", func(t *testing.T) {
|
|
||||||
meta, err := newMemoryMeta()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
svr := newTestServerWithMeta(t, nil, meta)
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
|
|
||||||
err = meta.AddSegment(context.TODO(), &SegmentInfo{
|
|
||||||
SegmentInfo: &datapb.SegmentInfo{
|
|
||||||
ID: 1,
|
|
||||||
State: commonpb.SegmentState_Flushed,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
err = meta.AddSegment(context.TODO(), &SegmentInfo{
|
|
||||||
SegmentInfo: &datapb.SegmentInfo{
|
|
||||||
ID: 2,
|
|
||||||
State: commonpb.SegmentState_Dropped,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
var (
|
|
||||||
vchannel = "ch1"
|
|
||||||
collection = int64(0)
|
|
||||||
)
|
|
||||||
|
|
||||||
svr.channelManager = &ChannelManagerImpl{
|
|
||||||
store: &ChannelStore{
|
|
||||||
channelsInfo: map[int64]*NodeChannelInfo{
|
|
||||||
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
err = svr.meta.UpdateChannelCheckpoint(vchannel, &msgpb.MsgPosition{
|
|
||||||
MsgID: []byte{1},
|
|
||||||
Timestamp: 12,
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
resp, err := svr.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
|
|
||||||
Status: merr.Success(),
|
|
||||||
Flushed: true,
|
|
||||||
}, resp)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("channel flushed", func(t *testing.T) {
|
|
||||||
meta, err := newMemoryMeta()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
svr := newTestServerWithMeta(t, nil, meta)
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
|
|
||||||
var (
|
|
||||||
vchannel = "ch1"
|
|
||||||
collection = int64(0)
|
|
||||||
)
|
|
||||||
|
|
||||||
svr.channelManager = &ChannelManagerImpl{
|
|
||||||
store: &ChannelStore{
|
|
||||||
channelsInfo: map[int64]*NodeChannelInfo{
|
|
||||||
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
err = svr.meta.UpdateChannelCheckpoint(vchannel, &msgpb.MsgPosition{
|
|
||||||
MsgID: []byte{1},
|
|
||||||
Timestamp: 12,
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
resp, err := svr.GetFlushState(context.Background(), &datapb.GetFlushStateRequest{
|
|
||||||
FlushTs: 11,
|
|
||||||
CollectionID: collection,
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
|
|
||||||
Status: merr.Success(),
|
|
||||||
Flushed: true,
|
|
||||||
}, resp)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("channel unflushed", func(t *testing.T) {
|
|
||||||
meta, err := newMemoryMeta()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
svr := newTestServerWithMeta(t, nil, meta)
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
|
|
||||||
var (
|
|
||||||
vchannel = "ch1"
|
|
||||||
collection = int64(0)
|
|
||||||
)
|
|
||||||
|
|
||||||
svr.channelManager = &ChannelManagerImpl{
|
|
||||||
store: &ChannelStore{
|
|
||||||
channelsInfo: map[int64]*NodeChannelInfo{
|
|
||||||
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
err = svr.meta.UpdateChannelCheckpoint(vchannel, &msgpb.MsgPosition{
|
|
||||||
MsgID: []byte{1},
|
|
||||||
Timestamp: 10,
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
resp, err := svr.GetFlushState(context.Background(), &datapb.GetFlushStateRequest{
|
|
||||||
FlushTs: 11,
|
|
||||||
CollectionID: collection,
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
|
|
||||||
Status: merr.Success(),
|
|
||||||
Flushed: false,
|
|
||||||
}, resp)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("no channels", func(t *testing.T) {
|
|
||||||
meta, err := newMemoryMeta()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
svr := newTestServerWithMeta(t, nil, meta)
|
|
||||||
defer closeTestServer(t, svr)
|
|
||||||
|
|
||||||
collection := int64(0)
|
|
||||||
|
|
||||||
resp, err := svr.GetFlushState(context.Background(), &datapb.GetFlushStateRequest{
|
|
||||||
FlushTs: 11,
|
|
||||||
CollectionID: collection,
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
|
|
||||||
Status: merr.Success(),
|
|
||||||
Flushed: true,
|
|
||||||
}, resp)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetFlushAllState(t *testing.T) {
|
func TestGetFlushAllState(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
testName string
|
testName string
|
||||||
|
@ -4273,11 +3356,6 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, commonpb.StateCode_Healthy, svr.stateCode.Load().(commonpb.StateCode))
|
assert.Equal(t, commonpb.StateCode_Healthy, svr.stateCode.Load().(commonpb.StateCode))
|
||||||
|
|
||||||
// Stop channal watch state watcher in tests
|
|
||||||
if svr.channelManager != nil && svr.channelManager.stopChecker != nil {
|
|
||||||
svr.channelManager.stopChecker()
|
|
||||||
}
|
|
||||||
|
|
||||||
return svr
|
return svr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4323,8 +3401,11 @@ func newTestServerWithMeta(t *testing.T, receiveCh chan any, meta *meta, opts ..
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Stop channal watch state watcher in tests
|
// Stop channal watch state watcher in tests
|
||||||
if svr.channelManager != nil && svr.channelManager.stopChecker != nil {
|
if svr.channelManager != nil {
|
||||||
svr.channelManager.stopChecker()
|
impl, ok := svr.channelManager.(*ChannelManagerImpl)
|
||||||
|
if ok && impl.stopChecker != nil {
|
||||||
|
impl.stopChecker()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return svr
|
return svr
|
||||||
|
@ -4376,8 +3457,11 @@ func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Stop channal watch state watcher in tests
|
// Stop channal watch state watcher in tests
|
||||||
if svr.channelManager != nil && svr.channelManager.stopChecker != nil {
|
if svr.channelManager != nil {
|
||||||
svr.channelManager.stopChecker()
|
impl, ok := svr.channelManager.(*ChannelManagerImpl)
|
||||||
|
if ok && impl.stopChecker != nil {
|
||||||
|
impl.stopChecker()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return svr
|
return svr
|
||||||
|
@ -4578,8 +3662,11 @@ func testDataCoordBase(t *testing.T, opts ...Option) *Server {
|
||||||
assert.Equal(t, commonpb.StateCode_Healthy, resp.GetState().GetStateCode())
|
assert.Equal(t, commonpb.StateCode_Healthy, resp.GetState().GetStateCode())
|
||||||
|
|
||||||
// stop channal watch state watcher in tests
|
// stop channal watch state watcher in tests
|
||||||
if svr.channelManager != nil && svr.channelManager.stopChecker != nil {
|
if svr.channelManager != nil {
|
||||||
svr.channelManager.stopChecker()
|
impl, ok := svr.channelManager.(*ChannelManagerImpl)
|
||||||
|
if ok && impl.stopChecker != nil {
|
||||||
|
impl.stopChecker()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return svr
|
return svr
|
||||||
|
|
|
@ -1261,7 +1261,7 @@ func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateReq
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
channels := s.channelManager.GetChannelNamesByCollectionID(req.GetCollectionID())
|
channels := s.channelManager.GetChannelsByCollectionID(req.GetCollectionID())
|
||||||
if len(channels) == 0 { // For compatibility with old client
|
if len(channels) == 0 { // For compatibility with old client
|
||||||
resp.Flushed = true
|
resp.Flushed = true
|
||||||
|
|
||||||
|
@ -1270,11 +1270,11 @@ func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateReq
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, channel := range channels {
|
for _, channel := range channels {
|
||||||
cp := s.meta.GetChannelCheckpoint(channel)
|
cp := s.meta.GetChannelCheckpoint(channel.GetName())
|
||||||
if cp == nil || cp.GetTimestamp() < req.GetFlushTs() {
|
if cp == nil || cp.GetTimestamp() < req.GetFlushTs() {
|
||||||
resp.Flushed = false
|
resp.Flushed = false
|
||||||
|
|
||||||
log.RatedInfo(10, "GetFlushState failed, channel unflushed", zap.String("channel", channel),
|
log.RatedInfo(10, "GetFlushState failed, channel unflushed", zap.String("channel", channel.GetName()),
|
||||||
zap.Time("CP", tsoutil.PhysicalTime(cp.GetTimestamp())),
|
zap.Time("CP", tsoutil.PhysicalTime(cp.GetTimestamp())),
|
||||||
zap.Duration("lag", tsoutil.PhysicalTime(req.GetFlushTs()).Sub(tsoutil.PhysicalTime(cp.GetTimestamp()))))
|
zap.Duration("lag", tsoutil.PhysicalTime(req.GetFlushTs()).Sub(tsoutil.PhysicalTime(cp.GetTimestamp()))))
|
||||||
return resp, nil
|
return resp, nil
|
||||||
|
|
|
@ -3,23 +3,660 @@ package datacoord
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
grpcStatus "google.golang.org/grpc/status"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||||
"github.com/milvus-io/milvus/internal/mocks"
|
"github.com/milvus-io/milvus/internal/mocks"
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||||
"github.com/milvus-io/milvus/internal/types"
|
"github.com/milvus-io/milvus/internal/types"
|
||||||
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
"github.com/milvus-io/milvus/pkg/util/metautil"
|
"github.com/milvus-io/milvus/pkg/util/metautil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type ServerSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
|
||||||
|
testServer *Server
|
||||||
|
mockChMgr *MockChannelManager
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServerSuite) SetupTest() {
|
||||||
|
s.testServer = newTestServer(s.T(), nil)
|
||||||
|
if s.testServer.channelManager != nil {
|
||||||
|
s.testServer.channelManager.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mockChMgr = NewMockChannelManager(s.T())
|
||||||
|
s.testServer.channelManager = s.mockChMgr
|
||||||
|
if s.mockChMgr != nil {
|
||||||
|
s.mockChMgr.EXPECT().Close().Maybe()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServerSuite) TearDownTest() {
|
||||||
|
if s.testServer != nil {
|
||||||
|
log.Info("ServerSuite tears down test", zap.String("name", s.T().Name()))
|
||||||
|
closeTestServer(s.T(), s.testServer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServerSuite(t *testing.T) {
|
||||||
|
suite.Run(t, new(ServerSuite))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServerSuite) TestGetFlushState_ByFlushTs() {
|
||||||
|
s.mockChMgr.EXPECT().GetChannelsByCollectionID(int64(0)).
|
||||||
|
Return([]RWChannel{&channelMeta{Name: "ch1", CollectionID: 0}}).Times(3)
|
||||||
|
|
||||||
|
s.mockChMgr.EXPECT().GetChannelsByCollectionID(int64(1)).Return(nil).Times(1)
|
||||||
|
tests := []struct {
|
||||||
|
description string
|
||||||
|
inTs Timestamp
|
||||||
|
|
||||||
|
expected bool
|
||||||
|
}{
|
||||||
|
{"channel cp > flush ts", 11, true},
|
||||||
|
{"channel cp = flush ts", 12, true},
|
||||||
|
{"channel cp < flush ts", 13, false},
|
||||||
|
}
|
||||||
|
|
||||||
|
err := s.testServer.meta.UpdateChannelCheckpoint("ch1", &msgpb.MsgPosition{
|
||||||
|
MsgID: []byte{1},
|
||||||
|
Timestamp: 12,
|
||||||
|
})
|
||||||
|
s.Require().NoError(err)
|
||||||
|
for _, test := range tests {
|
||||||
|
s.Run(test.description, func() {
|
||||||
|
resp, err := s.testServer.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{FlushTs: test.inTs})
|
||||||
|
s.NoError(err)
|
||||||
|
s.EqualValues(&milvuspb.GetFlushStateResponse{
|
||||||
|
Status: merr.Success(),
|
||||||
|
Flushed: test.expected,
|
||||||
|
}, resp)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := s.testServer.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{CollectionID: 1, FlushTs: 13})
|
||||||
|
s.NoError(err)
|
||||||
|
s.EqualValues(&milvuspb.GetFlushStateResponse{
|
||||||
|
Status: merr.Success(),
|
||||||
|
Flushed: true,
|
||||||
|
}, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServerSuite) TestGetFlushState_BySegment() {
|
||||||
|
s.mockChMgr.EXPECT().GetChannelsByCollectionID(mock.Anything).
|
||||||
|
Return([]RWChannel{&channelMeta{Name: "ch1", CollectionID: 0}}).Times(3)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
description string
|
||||||
|
segID int64
|
||||||
|
state commonpb.SegmentState
|
||||||
|
|
||||||
|
expected bool
|
||||||
|
}{
|
||||||
|
{"flushed seg1", 1, commonpb.SegmentState_Flushed, true},
|
||||||
|
{"flushed seg2", 2, commonpb.SegmentState_Flushed, true},
|
||||||
|
{"sealed seg3", 3, commonpb.SegmentState_Sealed, false},
|
||||||
|
{"compacted/dropped seg4", 4, commonpb.SegmentState_Dropped, true},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
s.Run(test.description, func() {
|
||||||
|
err := s.testServer.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: test.segID,
|
||||||
|
State: test.state,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Require().NoError(err)
|
||||||
|
err = s.testServer.meta.UpdateChannelCheckpoint("ch1", &msgpb.MsgPosition{
|
||||||
|
MsgID: []byte{1},
|
||||||
|
Timestamp: 12,
|
||||||
|
})
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
resp, err := s.testServer.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{test.segID}})
|
||||||
|
s.NoError(err)
|
||||||
|
s.EqualValues(&milvuspb.GetFlushStateResponse{
|
||||||
|
Status: merr.Success(),
|
||||||
|
Flushed: test.expected,
|
||||||
|
}, resp)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServerSuite) TestSaveBinlogPath_ClosedServer() {
|
||||||
|
s.TearDownTest()
|
||||||
|
resp, err := s.testServer.SaveBinlogPaths(context.Background(), &datapb.SaveBinlogPathsRequest{
|
||||||
|
SegmentID: 1,
|
||||||
|
Channel: "test",
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.ErrorIs(merr.Error(resp), merr.ErrServiceNotReady)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServerSuite) TestSaveBinlogPath_ChannelNotMatch() {
|
||||||
|
s.mockChMgr.EXPECT().Match(mock.Anything, mock.Anything).Return(false)
|
||||||
|
resp, err := s.testServer.SaveBinlogPaths(context.Background(), &datapb.SaveBinlogPathsRequest{
|
||||||
|
SegmentID: 1,
|
||||||
|
Channel: "test",
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.ErrorIs(merr.Error(resp), merr.ErrChannelNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServerSuite) TestSaveBinlogPath_SaveUnhealthySegment() {
|
||||||
|
s.mockChMgr.EXPECT().Match(int64(0), "ch1").Return(true)
|
||||||
|
s.testServer.meta.AddCollection(&collectionInfo{ID: 0})
|
||||||
|
|
||||||
|
segments := map[int64]commonpb.SegmentState{
|
||||||
|
0: commonpb.SegmentState_NotExist,
|
||||||
|
}
|
||||||
|
for segID, state := range segments {
|
||||||
|
info := &datapb.SegmentInfo{
|
||||||
|
ID: segID,
|
||||||
|
InsertChannel: "ch1",
|
||||||
|
State: state,
|
||||||
|
}
|
||||||
|
err := s.testServer.meta.AddSegment(context.TODO(), NewSegmentInfo(info))
|
||||||
|
s.Require().NoError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
resp, err := s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
|
||||||
|
Base: &commonpb.MsgBase{
|
||||||
|
Timestamp: uint64(time.Now().Unix()),
|
||||||
|
},
|
||||||
|
SegmentID: 1,
|
||||||
|
Channel: "ch1",
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.ErrorIs(merr.Error(resp), merr.ErrSegmentNotFound)
|
||||||
|
|
||||||
|
resp, err = s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
|
||||||
|
Base: &commonpb.MsgBase{
|
||||||
|
Timestamp: uint64(time.Now().Unix()),
|
||||||
|
},
|
||||||
|
SegmentID: 2,
|
||||||
|
Channel: "ch1",
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.ErrorIs(merr.Error(resp), merr.ErrSegmentNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServerSuite) TestSaveBinlogPath_SaveDroppedSegment() {
|
||||||
|
s.mockChMgr.EXPECT().Match(int64(0), "ch1").Return(true)
|
||||||
|
s.testServer.meta.AddCollection(&collectionInfo{ID: 0})
|
||||||
|
|
||||||
|
segments := map[int64]int64{
|
||||||
|
0: 0,
|
||||||
|
1: 0,
|
||||||
|
}
|
||||||
|
for segID, collID := range segments {
|
||||||
|
info := &datapb.SegmentInfo{
|
||||||
|
ID: segID,
|
||||||
|
CollectionID: collID,
|
||||||
|
InsertChannel: "ch1",
|
||||||
|
State: commonpb.SegmentState_Dropped,
|
||||||
|
}
|
||||||
|
err := s.testServer.meta.AddSegment(context.TODO(), NewSegmentInfo(info))
|
||||||
|
s.Require().NoError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
resp, err := s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
|
||||||
|
Base: &commonpb.MsgBase{
|
||||||
|
Timestamp: uint64(time.Now().Unix()),
|
||||||
|
},
|
||||||
|
SegmentID: 1,
|
||||||
|
CollectionID: 0,
|
||||||
|
Channel: "ch1",
|
||||||
|
Flushed: false,
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.EqualValues(resp.ErrorCode, commonpb.ErrorCode_Success)
|
||||||
|
|
||||||
|
segment := s.testServer.meta.GetSegment(1)
|
||||||
|
s.NotNil(segment)
|
||||||
|
s.EqualValues(0, len(segment.GetBinlogs()))
|
||||||
|
s.EqualValues(segment.NumOfRows, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServerSuite) TestSaveBinlogPath_L0Segment() {
|
||||||
|
s.mockChMgr.EXPECT().Match(int64(0), "ch1").Return(true)
|
||||||
|
s.testServer.meta.AddCollection(&collectionInfo{ID: 0})
|
||||||
|
|
||||||
|
segment := s.testServer.meta.GetHealthySegment(1)
|
||||||
|
s.Require().Nil(segment)
|
||||||
|
ctx := context.Background()
|
||||||
|
resp, err := s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
|
||||||
|
Base: &commonpb.MsgBase{
|
||||||
|
Timestamp: uint64(time.Now().Unix()),
|
||||||
|
},
|
||||||
|
SegmentID: 1,
|
||||||
|
PartitionID: 1,
|
||||||
|
CollectionID: 0,
|
||||||
|
SegLevel: datapb.SegmentLevel_L0,
|
||||||
|
Channel: "ch1",
|
||||||
|
Deltalogs: []*datapb.FieldBinlog{
|
||||||
|
{
|
||||||
|
FieldID: 1,
|
||||||
|
Binlogs: []*datapb.Binlog{
|
||||||
|
{
|
||||||
|
LogPath: "/by-dev/test/0/1/1/1/Allo1",
|
||||||
|
EntriesNum: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
LogPath: "/by-dev/test/0/1/1/1/Allo2",
|
||||||
|
EntriesNum: 5,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
CheckPoints: []*datapb.CheckPoint{
|
||||||
|
{
|
||||||
|
SegmentID: 1,
|
||||||
|
Position: &msgpb.MsgPosition{
|
||||||
|
ChannelName: "ch1",
|
||||||
|
MsgID: []byte{1, 2, 3},
|
||||||
|
MsgGroup: "",
|
||||||
|
Timestamp: 0,
|
||||||
|
},
|
||||||
|
NumOfRows: 12,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Flushed: true,
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.EqualValues(resp.ErrorCode, commonpb.ErrorCode_Success)
|
||||||
|
|
||||||
|
segment = s.testServer.meta.GetHealthySegment(1)
|
||||||
|
s.NotNil(segment)
|
||||||
|
s.EqualValues(datapb.SegmentLevel_L0, segment.GetLevel())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServerSuite) TestSaveBinlogPath_NormalCase() {
|
||||||
|
s.mockChMgr.EXPECT().Match(int64(0), "ch1").Return(true)
|
||||||
|
s.testServer.meta.AddCollection(&collectionInfo{ID: 0})
|
||||||
|
|
||||||
|
segments := map[int64]int64{
|
||||||
|
0: 0,
|
||||||
|
1: 0,
|
||||||
|
}
|
||||||
|
for segID, collID := range segments {
|
||||||
|
info := &datapb.SegmentInfo{
|
||||||
|
ID: segID,
|
||||||
|
CollectionID: collID,
|
||||||
|
InsertChannel: "ch1",
|
||||||
|
State: commonpb.SegmentState_Growing,
|
||||||
|
}
|
||||||
|
err := s.testServer.meta.AddSegment(context.TODO(), NewSegmentInfo(info))
|
||||||
|
s.Require().NoError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
resp, err := s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
|
||||||
|
Base: &commonpb.MsgBase{
|
||||||
|
Timestamp: uint64(time.Now().Unix()),
|
||||||
|
},
|
||||||
|
SegmentID: 1,
|
||||||
|
CollectionID: 0,
|
||||||
|
Channel: "ch1",
|
||||||
|
Field2BinlogPaths: []*datapb.FieldBinlog{
|
||||||
|
{
|
||||||
|
FieldID: 1,
|
||||||
|
Binlogs: []*datapb.Binlog{
|
||||||
|
{
|
||||||
|
LogPath: "/by-dev/test/0/1/1/1/Allo1",
|
||||||
|
EntriesNum: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
LogPath: "/by-dev/test/0/1/1/1/Allo2",
|
||||||
|
EntriesNum: 5,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Field2StatslogPaths: []*datapb.FieldBinlog{
|
||||||
|
{
|
||||||
|
FieldID: 1,
|
||||||
|
Binlogs: []*datapb.Binlog{
|
||||||
|
{
|
||||||
|
LogPath: "/by-dev/test_stats/0/1/1/1/Allo1",
|
||||||
|
EntriesNum: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
LogPath: "/by-dev/test_stats/0/1/1/1/Allo2",
|
||||||
|
EntriesNum: 5,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
CheckPoints: []*datapb.CheckPoint{
|
||||||
|
{
|
||||||
|
SegmentID: 1,
|
||||||
|
Position: &msgpb.MsgPosition{
|
||||||
|
ChannelName: "ch1",
|
||||||
|
MsgID: []byte{1, 2, 3},
|
||||||
|
MsgGroup: "",
|
||||||
|
Timestamp: 0,
|
||||||
|
},
|
||||||
|
NumOfRows: 12,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Flushed: false,
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.EqualValues(resp.ErrorCode, commonpb.ErrorCode_Success)
|
||||||
|
|
||||||
|
segment := s.testServer.meta.GetHealthySegment(1)
|
||||||
|
s.NotNil(segment)
|
||||||
|
binlogs := segment.GetBinlogs()
|
||||||
|
s.EqualValues(1, len(binlogs))
|
||||||
|
fieldBinlogs := binlogs[0]
|
||||||
|
s.NotNil(fieldBinlogs)
|
||||||
|
s.EqualValues(2, len(fieldBinlogs.GetBinlogs()))
|
||||||
|
s.EqualValues(1, fieldBinlogs.GetFieldID())
|
||||||
|
s.EqualValues("/by-dev/test/0/1/1/1/Allo1", fieldBinlogs.GetBinlogs()[0].GetLogPath())
|
||||||
|
s.EqualValues("/by-dev/test/0/1/1/1/Allo2", fieldBinlogs.GetBinlogs()[1].GetLogPath())
|
||||||
|
|
||||||
|
s.EqualValues(segment.DmlPosition.ChannelName, "ch1")
|
||||||
|
s.EqualValues(segment.DmlPosition.MsgID, []byte{1, 2, 3})
|
||||||
|
s.EqualValues(segment.NumOfRows, 10)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServerSuite) TestFlush_NormalCase() {
|
||||||
|
req := &datapb.FlushRequest{
|
||||||
|
Base: &commonpb.MsgBase{
|
||||||
|
MsgType: commonpb.MsgType_Flush,
|
||||||
|
MsgID: 0,
|
||||||
|
Timestamp: 0,
|
||||||
|
SourceID: 0,
|
||||||
|
},
|
||||||
|
DbID: 0,
|
||||||
|
CollectionID: 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mockChMgr.EXPECT().GetNodeChannelsByCollectionID(mock.Anything).Return(map[int64][]string{
|
||||||
|
1: {"channel-1"},
|
||||||
|
})
|
||||||
|
|
||||||
|
mockCluster := NewMockCluster(s.T())
|
||||||
|
mockCluster.EXPECT().FlushChannels(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||||
|
Return(nil)
|
||||||
|
mockCluster.EXPECT().Close().Maybe()
|
||||||
|
s.testServer.cluster = mockCluster
|
||||||
|
|
||||||
|
schema := newTestSchema()
|
||||||
|
s.testServer.meta.AddCollection(&collectionInfo{ID: 0, Schema: schema, Partitions: []int64{}})
|
||||||
|
allocations, err := s.testServer.segmentManager.AllocSegment(context.TODO(), 0, 1, "channel-1", 1)
|
||||||
|
s.NoError(err)
|
||||||
|
s.EqualValues(1, len(allocations))
|
||||||
|
expireTs := allocations[0].ExpireTime
|
||||||
|
segID := allocations[0].SegmentID
|
||||||
|
|
||||||
|
resp, err := s.testServer.Flush(context.TODO(), req)
|
||||||
|
s.NoError(err)
|
||||||
|
s.EqualValues(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
||||||
|
|
||||||
|
s.testServer.meta.SetCurrentRows(segID, 1)
|
||||||
|
ids, err := s.testServer.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs)
|
||||||
|
s.NoError(err)
|
||||||
|
s.EqualValues(1, len(ids))
|
||||||
|
s.EqualValues(segID, ids[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServerSuite) TestFlush_BulkLoadSegment() {
|
||||||
|
req := &datapb.FlushRequest{
|
||||||
|
Base: &commonpb.MsgBase{
|
||||||
|
MsgType: commonpb.MsgType_Flush,
|
||||||
|
MsgID: 0,
|
||||||
|
Timestamp: 0,
|
||||||
|
SourceID: 0,
|
||||||
|
},
|
||||||
|
DbID: 0,
|
||||||
|
CollectionID: 0,
|
||||||
|
}
|
||||||
|
s.mockChMgr.EXPECT().GetNodeChannelsByCollectionID(mock.Anything).Return(map[int64][]string{
|
||||||
|
1: {"channel-1"},
|
||||||
|
}).Twice()
|
||||||
|
|
||||||
|
mockCluster := NewMockCluster(s.T())
|
||||||
|
mockCluster.EXPECT().FlushChannels(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||||
|
Return(nil).Twice()
|
||||||
|
mockCluster.EXPECT().Close().Maybe()
|
||||||
|
s.testServer.cluster = mockCluster
|
||||||
|
|
||||||
|
schema := newTestSchema()
|
||||||
|
s.testServer.meta.AddCollection(&collectionInfo{ID: 0, Schema: schema, Partitions: []int64{}})
|
||||||
|
|
||||||
|
allocations, err := s.testServer.segmentManager.allocSegmentForImport(context.TODO(), 0, 1, "channel-1", 1, 100)
|
||||||
|
s.NoError(err)
|
||||||
|
expireTs := allocations.ExpireTime
|
||||||
|
segID := allocations.SegmentID
|
||||||
|
|
||||||
|
resp, err := s.testServer.Flush(context.TODO(), req)
|
||||||
|
s.NoError(err)
|
||||||
|
s.EqualValues(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
||||||
|
s.EqualValues(0, len(resp.SegmentIDs))
|
||||||
|
// should not flush anything since this is a normal flush
|
||||||
|
s.testServer.meta.SetCurrentRows(segID, 1)
|
||||||
|
ids, err := s.testServer.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs)
|
||||||
|
s.NoError(err)
|
||||||
|
s.EqualValues(0, len(ids))
|
||||||
|
|
||||||
|
req = &datapb.FlushRequest{
|
||||||
|
Base: &commonpb.MsgBase{
|
||||||
|
MsgType: commonpb.MsgType_Flush,
|
||||||
|
MsgID: 0,
|
||||||
|
Timestamp: 0,
|
||||||
|
SourceID: 0,
|
||||||
|
},
|
||||||
|
DbID: 0,
|
||||||
|
CollectionID: 0,
|
||||||
|
IsImport: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err = s.testServer.Flush(context.TODO(), req)
|
||||||
|
s.NoError(err)
|
||||||
|
s.EqualValues(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
||||||
|
s.EqualValues(1, len(resp.SegmentIDs))
|
||||||
|
|
||||||
|
ids, err = s.testServer.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs)
|
||||||
|
s.NoError(err)
|
||||||
|
s.EqualValues(1, len(ids))
|
||||||
|
s.EqualValues(segID, ids[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServerSuite) TestFlush_ClosedServer() {
|
||||||
|
s.TearDownTest()
|
||||||
|
req := &datapb.FlushRequest{
|
||||||
|
Base: &commonpb.MsgBase{
|
||||||
|
MsgType: commonpb.MsgType_Flush,
|
||||||
|
MsgID: 0,
|
||||||
|
Timestamp: 0,
|
||||||
|
SourceID: 0,
|
||||||
|
},
|
||||||
|
DbID: 0,
|
||||||
|
CollectionID: 0,
|
||||||
|
}
|
||||||
|
resp, err := s.testServer.Flush(context.Background(), req)
|
||||||
|
s.NoError(err)
|
||||||
|
s.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServerSuite) TestFlush_RollingUpgrade() {
|
||||||
|
req := &datapb.FlushRequest{
|
||||||
|
Base: &commonpb.MsgBase{
|
||||||
|
MsgType: commonpb.MsgType_Flush,
|
||||||
|
MsgID: 0,
|
||||||
|
Timestamp: 0,
|
||||||
|
SourceID: 0,
|
||||||
|
},
|
||||||
|
DbID: 0,
|
||||||
|
CollectionID: 0,
|
||||||
|
}
|
||||||
|
mockCluster := NewMockCluster(s.T())
|
||||||
|
mockCluster.EXPECT().FlushChannels(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||||
|
Return(merr.WrapErrServiceUnimplemented(grpcStatus.Error(codes.Unimplemented, "mock grpc unimplemented error")))
|
||||||
|
mockCluster.EXPECT().Close().Maybe()
|
||||||
|
s.testServer.cluster = mockCluster
|
||||||
|
s.mockChMgr.EXPECT().GetNodeChannelsByCollectionID(mock.Anything).Return(map[int64][]string{
|
||||||
|
1: {"channel-1"},
|
||||||
|
}).Once()
|
||||||
|
|
||||||
|
resp, err := s.testServer.Flush(context.TODO(), req)
|
||||||
|
s.NoError(err)
|
||||||
|
s.EqualValues(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
||||||
|
s.EqualValues(0, resp.GetFlushTs())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServerSuite) TestGetSegmentInfoChannel() {
|
||||||
|
resp, err := s.testServer.GetSegmentInfoChannel(context.TODO(), nil)
|
||||||
|
s.NoError(err)
|
||||||
|
s.EqualValues(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
||||||
|
s.EqualValues(Params.CommonCfg.DataCoordSegmentInfo.GetValue(), resp.Value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServerSuite) TestAssignSegmentID() {
|
||||||
|
s.TearDownTest()
|
||||||
|
const collID = 100
|
||||||
|
const collIDInvalid = 101
|
||||||
|
const partID = 0
|
||||||
|
const channel0 = "channel0"
|
||||||
|
|
||||||
|
s.Run("assign segment normally", func() {
|
||||||
|
s.SetupTest()
|
||||||
|
defer s.TearDownTest()
|
||||||
|
|
||||||
|
schema := newTestSchema()
|
||||||
|
s.testServer.meta.AddCollection(&collectionInfo{
|
||||||
|
ID: collID,
|
||||||
|
Schema: schema,
|
||||||
|
Partitions: []int64{},
|
||||||
|
})
|
||||||
|
req := &datapb.SegmentIDRequest{
|
||||||
|
Count: 1000,
|
||||||
|
ChannelName: channel0,
|
||||||
|
CollectionID: collID,
|
||||||
|
PartitionID: partID,
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := s.testServer.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{
|
||||||
|
NodeID: 0,
|
||||||
|
PeerRole: "",
|
||||||
|
SegmentIDRequests: []*datapb.SegmentIDRequest{req},
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.EqualValues(1, len(resp.SegIDAssignments))
|
||||||
|
assign := resp.SegIDAssignments[0]
|
||||||
|
s.EqualValues(commonpb.ErrorCode_Success, assign.GetStatus().GetErrorCode())
|
||||||
|
s.EqualValues(collID, assign.CollectionID)
|
||||||
|
s.EqualValues(partID, assign.PartitionID)
|
||||||
|
s.EqualValues(channel0, assign.ChannelName)
|
||||||
|
s.EqualValues(1000, assign.Count)
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("assign segment for bulkload", func() {
|
||||||
|
s.SetupTest()
|
||||||
|
defer s.TearDownTest()
|
||||||
|
|
||||||
|
schema := newTestSchema()
|
||||||
|
s.testServer.meta.AddCollection(&collectionInfo{
|
||||||
|
ID: collID,
|
||||||
|
Schema: schema,
|
||||||
|
Partitions: []int64{},
|
||||||
|
})
|
||||||
|
req := &datapb.SegmentIDRequest{
|
||||||
|
Count: 1000,
|
||||||
|
ChannelName: channel0,
|
||||||
|
CollectionID: collID,
|
||||||
|
PartitionID: partID,
|
||||||
|
IsImport: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := s.testServer.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{
|
||||||
|
NodeID: 0,
|
||||||
|
PeerRole: "",
|
||||||
|
SegmentIDRequests: []*datapb.SegmentIDRequest{req},
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.EqualValues(1, len(resp.SegIDAssignments))
|
||||||
|
assign := resp.SegIDAssignments[0]
|
||||||
|
s.EqualValues(commonpb.ErrorCode_Success, assign.GetStatus().GetErrorCode())
|
||||||
|
s.EqualValues(collID, assign.CollectionID)
|
||||||
|
s.EqualValues(partID, assign.PartitionID)
|
||||||
|
s.EqualValues(channel0, assign.ChannelName)
|
||||||
|
s.EqualValues(1000, assign.Count)
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("with closed server", func() {
|
||||||
|
s.SetupTest()
|
||||||
|
s.TearDownTest()
|
||||||
|
|
||||||
|
req := &datapb.SegmentIDRequest{
|
||||||
|
Count: 100,
|
||||||
|
ChannelName: channel0,
|
||||||
|
CollectionID: collID,
|
||||||
|
PartitionID: partID,
|
||||||
|
}
|
||||||
|
resp, err := s.testServer.AssignSegmentID(context.Background(), &datapb.AssignSegmentIDRequest{
|
||||||
|
NodeID: 0,
|
||||||
|
PeerRole: "",
|
||||||
|
SegmentIDRequests: []*datapb.SegmentIDRequest{req},
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("assign segment with invalid collection", func() {
|
||||||
|
s.SetupTest()
|
||||||
|
defer s.TearDownTest()
|
||||||
|
|
||||||
|
s.testServer.rootCoordClient = &mockRootCoord{
|
||||||
|
RootCoordClient: s.testServer.rootCoordClient,
|
||||||
|
collID: collID,
|
||||||
|
}
|
||||||
|
|
||||||
|
schema := newTestSchema()
|
||||||
|
s.testServer.meta.AddCollection(&collectionInfo{
|
||||||
|
ID: collID,
|
||||||
|
Schema: schema,
|
||||||
|
Partitions: []int64{},
|
||||||
|
})
|
||||||
|
req := &datapb.SegmentIDRequest{
|
||||||
|
Count: 1000,
|
||||||
|
ChannelName: channel0,
|
||||||
|
CollectionID: collIDInvalid,
|
||||||
|
PartitionID: partID,
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := s.testServer.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{
|
||||||
|
NodeID: 0,
|
||||||
|
PeerRole: "",
|
||||||
|
SegmentIDRequests: []*datapb.SegmentIDRequest{req},
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.EqualValues(0, len(resp.SegIDAssignments))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestBroadcastAlteredCollection(t *testing.T) {
|
func TestBroadcastAlteredCollection(t *testing.T) {
|
||||||
t.Run("test server is closed", func(t *testing.T) {
|
t.Run("test server is closed", func(t *testing.T) {
|
||||||
s := &Server{}
|
s := &Server{}
|
||||||
|
|
Loading…
Reference in New Issue