From 128c66f30163ce535c123bc0c7d83097d6d0ca94 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 25 Jul 2022 14:14:31 +0800 Subject: [PATCH] Fix segment DmChannel in querycoord for compatibility to 2.0.2 (#18397) Reload segment dm channel if empty channel found Patch originally authored by @czs007 Unit test added by @congqixia Co-authored-by: zhenshan.cao Signed-off-by: Congqi Xia Co-authored-by: zhenshan.cao --- internal/querycoord/meta.go | 50 ++++++- internal/querycoord/meta_test.go | 227 +++++++++++++++++++++++++++++ internal/querycoord/query_coord.go | 10 ++ 3 files changed, 286 insertions(+), 1 deletion(-) diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index 08b682f3d4..5168f91eaf 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -95,6 +95,8 @@ type Meta interface { updateShardLeader(replicaID UniqueID, dmChannel string, leaderID UniqueID, leaderAddr string) error } +var _ Meta = (*MetaReplica)(nil) + // MetaReplica records the current load information on all querynodes type MetaReplica struct { ctx context.Context @@ -149,6 +151,53 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory dependency.Factory, idAl return m, nil } +func (m *MetaReplica) fixSegmentInfoDMChannel() error { + var segmentIDs []UniqueID + for id, info := range m.segmentsInfo.segmentIDMap { + if info.GetDmChannel() == "" { + segmentIDs = append(segmentIDs, id) + } + } + + if len(segmentIDs) == 0 { + log.Info("QueryCoord MetaReplica no need to fix SegmentInfo DmChannel") + return nil + } + + //var segmentInfos []*datapb.SegmentInfo + infoResp, err := m.dataCoord.GetSegmentInfo(m.ctx, &datapb.GetSegmentInfoRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SegmentInfo, + }, + SegmentIDs: segmentIDs, + IncludeUnHealthy: true, + }) + if err != nil { + log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err)) + return err + } + if infoResp.GetStatus().ErrorCode != commonpb.ErrorCode_Success { + err = errors.New(infoResp.GetStatus().Reason) + log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err)) + return err + } + + for _, newInfo := range infoResp.Infos { + segment, err := m.getSegmentInfoByID(newInfo.GetID()) + if err != nil { + log.Warn("failed to find original patched segment", zap.Int64("segmentID", newInfo.GetID()), zap.Error(err)) + return err + } + segment.DmChannel = newInfo.GetInsertChannel() + err = m.segmentsInfo.saveSegment(segment) + if err != nil { + log.Warn("failed to save patched segment", zap.Int64("segmentID", newInfo.GetID()), zap.Error(err)) + return err + } + } + return nil +} + func (m *MetaReplica) reloadFromKV() error { log.Info("recovery collections...") collectionKeys, collectionValues, err := m.getKvClient().LoadWithPrefix(collectionMetaPrefix) @@ -283,7 +332,6 @@ func (m *MetaReplica) reloadFromKV() error { //TODO::update partition states log.Info("reload from kv finished") - return nil } diff --git a/internal/querycoord/meta_test.go b/internal/querycoord/meta_test.go index 6bad3c95e2..2114db9e78 100644 --- a/internal/querycoord/meta_test.go +++ b/internal/querycoord/meta_test.go @@ -26,6 +26,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -35,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util/etcd" ) @@ -645,3 +647,228 @@ func MockSaveSegments(segmentNum int) col2SegmentInfos { return saves } + +type mockKV struct { + kv.MetaKv + mock.Mock +} + +func (m *mockKV) Save(k, v string) error { + args := m.Called(k, v) + return args.Error(0) +} + +type mockDataCoord struct { + types.DataCoord + mock.Mock +} + +func (m *mockDataCoord) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { + args := m.Called(ctx, req) + return args.Get(0).(*datapb.GetSegmentInfoResponse), args.Error(1) +} + +func TestFixSegmentInfoDmlChannel(t *testing.T) { + + t.Run("No flawed segments", func(t *testing.T) { + mkv := &mockKV{} + dc := &mockDataCoord{} + + meta := &MetaReplica{ + ctx: context.Background(), + collectionInfos: map[UniqueID]*querypb.CollectionInfo{}, + dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{}, + segmentsInfo: newSegmentsInfo(mkv), + replicas: NewReplicaInfos(), + client: mkv, + dataCoord: dc, + } + + mkv.Test(t) + dc.Test(t) + meta.segmentsInfo.segmentIDMap[1] = &querypb.SegmentInfo{SegmentID: 1, DmChannel: "dml_channel_01"} + + err := meta.fixSegmentInfoDMChannel() + assert.NoError(t, err) + + mkv.Mock.AssertNotCalled(t, "Save", mock.AnythingOfType("string"), mock.AnythingOfType("string")) + dc.Mock.AssertNotCalled(t, "GetSegmentInfo", mock.Anything, mock.Anything) + }) + + t.Run("with flawed segments", func(t *testing.T) { + mkv := &mockKV{} + dc := &mockDataCoord{} + + meta := &MetaReplica{ + ctx: context.Background(), + collectionInfos: map[UniqueID]*querypb.CollectionInfo{}, + dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{}, + segmentsInfo: newSegmentsInfo(mkv), + replicas: NewReplicaInfos(), + client: mkv, + dataCoord: dc, + } + + mkv.Test(t) + dc.Test(t) + meta.segmentsInfo.segmentIDMap[1] = &querypb.SegmentInfo{SegmentID: 1, DmChannel: ""} + + mkv.On("Save", mock.Anything, mock.Anything).Return(nil) + dc.On("GetSegmentInfo", context.Background(), &datapb.GetSegmentInfoRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SegmentInfo, + }, + SegmentIDs: []UniqueID{1}, + IncludeUnHealthy: true, + }).Return(&datapb.GetSegmentInfoResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Infos: []*datapb.SegmentInfo{ + {ID: 1, InsertChannel: "dml_channel_01"}, + }, + }, nil) + + err := meta.fixSegmentInfoDMChannel() + assert.NoError(t, err) + + info, err := meta.getSegmentInfoByID(1) + assert.NoError(t, err) + assert.NotEqual(t, "", info.DmChannel) + }) + + t.Run("GetSegmentInfo_error", func(t *testing.T) { + mkv := &mockKV{} + dc := &mockDataCoord{} + + meta := &MetaReplica{ + ctx: context.Background(), + collectionInfos: map[UniqueID]*querypb.CollectionInfo{}, + dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{}, + segmentsInfo: newSegmentsInfo(mkv), + replicas: NewReplicaInfos(), + client: mkv, + dataCoord: dc, + } + + mkv.Test(t) + dc.Test(t) + meta.segmentsInfo.segmentIDMap[1] = &querypb.SegmentInfo{SegmentID: 1, DmChannel: ""} + + mkv.On("Save", mock.Anything, mock.Anything).Return(nil) + dc.On("GetSegmentInfo", context.Background(), &datapb.GetSegmentInfoRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SegmentInfo, + }, + SegmentIDs: []UniqueID{1}, + IncludeUnHealthy: true, + }).Return((*datapb.GetSegmentInfoResponse)(nil), errors.New("mock")) + + err := meta.fixSegmentInfoDMChannel() + assert.Error(t, err) + }) + + t.Run("GetSegmentInfo_fail", func(t *testing.T) { + mkv := &mockKV{} + dc := &mockDataCoord{} + + meta := &MetaReplica{ + ctx: context.Background(), + collectionInfos: map[UniqueID]*querypb.CollectionInfo{}, + dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{}, + segmentsInfo: newSegmentsInfo(mkv), + replicas: NewReplicaInfos(), + client: mkv, + dataCoord: dc, + } + + mkv.Test(t) + dc.Test(t) + meta.segmentsInfo.segmentIDMap[1] = &querypb.SegmentInfo{SegmentID: 1, DmChannel: ""} + + mkv.On("Save", mock.Anything, mock.Anything).Return(nil) + dc.On("GetSegmentInfo", context.Background(), &datapb.GetSegmentInfoRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SegmentInfo, + }, + SegmentIDs: []UniqueID{1}, + IncludeUnHealthy: true, + }).Return(&datapb.GetSegmentInfoResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SegmentNotFound}, + }, nil) + + err := meta.fixSegmentInfoDMChannel() + assert.Error(t, err) + }) + + t.Run("segments patched not found", func(t *testing.T) { + mkv := &mockKV{} + dc := &mockDataCoord{} + + meta := &MetaReplica{ + ctx: context.Background(), + collectionInfos: map[UniqueID]*querypb.CollectionInfo{}, + dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{}, + segmentsInfo: newSegmentsInfo(mkv), + replicas: NewReplicaInfos(), + client: mkv, + dataCoord: dc, + } + + mkv.Test(t) + dc.Test(t) + meta.segmentsInfo.segmentIDMap[1] = &querypb.SegmentInfo{SegmentID: 1, DmChannel: ""} + + mkv.On("Save", mock.Anything, mock.Anything).Return(nil) + dc.On("GetSegmentInfo", context.Background(), &datapb.GetSegmentInfoRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SegmentInfo, + }, + SegmentIDs: []UniqueID{1}, + IncludeUnHealthy: true, + }).Return(&datapb.GetSegmentInfoResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Infos: []*datapb.SegmentInfo{ + {ID: 2, InsertChannel: "dml_channel_01"}, + }, + }, nil) + + err := meta.fixSegmentInfoDMChannel() + assert.Error(t, err) + }) + + t.Run("save patched segment failed", func(t *testing.T) { + mkv := &mockKV{} + dc := &mockDataCoord{} + + meta := &MetaReplica{ + ctx: context.Background(), + collectionInfos: map[UniqueID]*querypb.CollectionInfo{}, + dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{}, + segmentsInfo: newSegmentsInfo(mkv), + replicas: NewReplicaInfos(), + client: mkv, + dataCoord: dc, + } + + mkv.Test(t) + dc.Test(t) + meta.segmentsInfo.segmentIDMap[1] = &querypb.SegmentInfo{SegmentID: 1, DmChannel: ""} + + mkv.On("Save", mock.Anything, mock.Anything).Return(errors.New("mocked")) + dc.On("GetSegmentInfo", context.Background(), &datapb.GetSegmentInfoRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SegmentInfo, + }, + SegmentIDs: []UniqueID{1}, + IncludeUnHealthy: true, + }).Return(&datapb.GetSegmentInfoResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Infos: []*datapb.SegmentInfo{ + {ID: 1, InsertChannel: "dml_channel_01"}, + }, + }, nil) + + err := meta.fixSegmentInfoDMChannel() + assert.Error(t, err) + }) + +} diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index fdd1bb4826..a632d1871b 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -165,6 +165,16 @@ func (qc *QueryCoord) Init() error { log.Error("query coordinator init meta failed", zap.Error(initError)) return } + meta, ok := qc.meta.(*MetaReplica) + if !ok { + panic("QueryCoord qc.meta assertion of MetaReplica error") + } + + meta.dataCoord = qc.dataCoordClient + fixErr := meta.fixSegmentInfoDMChannel() + if fixErr != nil { + log.Error("QueryCoord newMeta fixSegmentInfoDMChannel failed", zap.Error(fixErr)) + } // init channelUnsubscribeHandler qc.channelCleaner, initError = NewChannelCleaner(qc.loopCtx, qc.kvClient, qc.factory)