Fix segment DmChannel in querycoord for compatibility to 2.0.2 (#18397)

Reload segment dm channel if empty channel found

Patch originally authored by @czs007
Unit test added by @congqixia

Co-authored-by: zhenshan.cao <zhenshan.cao@zilliz.com>
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

Co-authored-by: zhenshan.cao <zhenshan.cao@zilliz.com>
pull/18399/head
congqixia 2022-07-25 14:14:31 +08:00 committed by GitHub
parent 45746bb736
commit 128c66f301
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 286 additions and 1 deletions

View File

@ -95,6 +95,8 @@ type Meta interface {
updateShardLeader(replicaID UniqueID, dmChannel string, leaderID UniqueID, leaderAddr string) error
}
var _ Meta = (*MetaReplica)(nil)
// MetaReplica records the current load information on all querynodes
type MetaReplica struct {
ctx context.Context
@ -149,6 +151,53 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory dependency.Factory, idAl
return m, nil
}
func (m *MetaReplica) fixSegmentInfoDMChannel() error {
var segmentIDs []UniqueID
for id, info := range m.segmentsInfo.segmentIDMap {
if info.GetDmChannel() == "" {
segmentIDs = append(segmentIDs, id)
}
}
if len(segmentIDs) == 0 {
log.Info("QueryCoord MetaReplica no need to fix SegmentInfo DmChannel")
return nil
}
//var segmentInfos []*datapb.SegmentInfo
infoResp, err := m.dataCoord.GetSegmentInfo(m.ctx, &datapb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
},
SegmentIDs: segmentIDs,
IncludeUnHealthy: true,
})
if err != nil {
log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err))
return err
}
if infoResp.GetStatus().ErrorCode != commonpb.ErrorCode_Success {
err = errors.New(infoResp.GetStatus().Reason)
log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err))
return err
}
for _, newInfo := range infoResp.Infos {
segment, err := m.getSegmentInfoByID(newInfo.GetID())
if err != nil {
log.Warn("failed to find original patched segment", zap.Int64("segmentID", newInfo.GetID()), zap.Error(err))
return err
}
segment.DmChannel = newInfo.GetInsertChannel()
err = m.segmentsInfo.saveSegment(segment)
if err != nil {
log.Warn("failed to save patched segment", zap.Int64("segmentID", newInfo.GetID()), zap.Error(err))
return err
}
}
return nil
}
func (m *MetaReplica) reloadFromKV() error {
log.Info("recovery collections...")
collectionKeys, collectionValues, err := m.getKvClient().LoadWithPrefix(collectionMetaPrefix)
@ -283,7 +332,6 @@ func (m *MetaReplica) reloadFromKV() error {
//TODO::update partition states
log.Info("reload from kv finished")
return nil
}

View File

@ -26,6 +26,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
@ -35,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/etcd"
)
@ -645,3 +647,228 @@ func MockSaveSegments(segmentNum int) col2SegmentInfos {
return saves
}
type mockKV struct {
kv.MetaKv
mock.Mock
}
func (m *mockKV) Save(k, v string) error {
args := m.Called(k, v)
return args.Error(0)
}
type mockDataCoord struct {
types.DataCoord
mock.Mock
}
func (m *mockDataCoord) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
args := m.Called(ctx, req)
return args.Get(0).(*datapb.GetSegmentInfoResponse), args.Error(1)
}
func TestFixSegmentInfoDmlChannel(t *testing.T) {
t.Run("No flawed segments", func(t *testing.T) {
mkv := &mockKV{}
dc := &mockDataCoord{}
meta := &MetaReplica{
ctx: context.Background(),
collectionInfos: map[UniqueID]*querypb.CollectionInfo{},
dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{},
segmentsInfo: newSegmentsInfo(mkv),
replicas: NewReplicaInfos(),
client: mkv,
dataCoord: dc,
}
mkv.Test(t)
dc.Test(t)
meta.segmentsInfo.segmentIDMap[1] = &querypb.SegmentInfo{SegmentID: 1, DmChannel: "dml_channel_01"}
err := meta.fixSegmentInfoDMChannel()
assert.NoError(t, err)
mkv.Mock.AssertNotCalled(t, "Save", mock.AnythingOfType("string"), mock.AnythingOfType("string"))
dc.Mock.AssertNotCalled(t, "GetSegmentInfo", mock.Anything, mock.Anything)
})
t.Run("with flawed segments", func(t *testing.T) {
mkv := &mockKV{}
dc := &mockDataCoord{}
meta := &MetaReplica{
ctx: context.Background(),
collectionInfos: map[UniqueID]*querypb.CollectionInfo{},
dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{},
segmentsInfo: newSegmentsInfo(mkv),
replicas: NewReplicaInfos(),
client: mkv,
dataCoord: dc,
}
mkv.Test(t)
dc.Test(t)
meta.segmentsInfo.segmentIDMap[1] = &querypb.SegmentInfo{SegmentID: 1, DmChannel: ""}
mkv.On("Save", mock.Anything, mock.Anything).Return(nil)
dc.On("GetSegmentInfo", context.Background(), &datapb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
},
SegmentIDs: []UniqueID{1},
IncludeUnHealthy: true,
}).Return(&datapb.GetSegmentInfoResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Infos: []*datapb.SegmentInfo{
{ID: 1, InsertChannel: "dml_channel_01"},
},
}, nil)
err := meta.fixSegmentInfoDMChannel()
assert.NoError(t, err)
info, err := meta.getSegmentInfoByID(1)
assert.NoError(t, err)
assert.NotEqual(t, "", info.DmChannel)
})
t.Run("GetSegmentInfo_error", func(t *testing.T) {
mkv := &mockKV{}
dc := &mockDataCoord{}
meta := &MetaReplica{
ctx: context.Background(),
collectionInfos: map[UniqueID]*querypb.CollectionInfo{},
dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{},
segmentsInfo: newSegmentsInfo(mkv),
replicas: NewReplicaInfos(),
client: mkv,
dataCoord: dc,
}
mkv.Test(t)
dc.Test(t)
meta.segmentsInfo.segmentIDMap[1] = &querypb.SegmentInfo{SegmentID: 1, DmChannel: ""}
mkv.On("Save", mock.Anything, mock.Anything).Return(nil)
dc.On("GetSegmentInfo", context.Background(), &datapb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
},
SegmentIDs: []UniqueID{1},
IncludeUnHealthy: true,
}).Return((*datapb.GetSegmentInfoResponse)(nil), errors.New("mock"))
err := meta.fixSegmentInfoDMChannel()
assert.Error(t, err)
})
t.Run("GetSegmentInfo_fail", func(t *testing.T) {
mkv := &mockKV{}
dc := &mockDataCoord{}
meta := &MetaReplica{
ctx: context.Background(),
collectionInfos: map[UniqueID]*querypb.CollectionInfo{},
dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{},
segmentsInfo: newSegmentsInfo(mkv),
replicas: NewReplicaInfos(),
client: mkv,
dataCoord: dc,
}
mkv.Test(t)
dc.Test(t)
meta.segmentsInfo.segmentIDMap[1] = &querypb.SegmentInfo{SegmentID: 1, DmChannel: ""}
mkv.On("Save", mock.Anything, mock.Anything).Return(nil)
dc.On("GetSegmentInfo", context.Background(), &datapb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
},
SegmentIDs: []UniqueID{1},
IncludeUnHealthy: true,
}).Return(&datapb.GetSegmentInfoResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SegmentNotFound},
}, nil)
err := meta.fixSegmentInfoDMChannel()
assert.Error(t, err)
})
t.Run("segments patched not found", func(t *testing.T) {
mkv := &mockKV{}
dc := &mockDataCoord{}
meta := &MetaReplica{
ctx: context.Background(),
collectionInfos: map[UniqueID]*querypb.CollectionInfo{},
dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{},
segmentsInfo: newSegmentsInfo(mkv),
replicas: NewReplicaInfos(),
client: mkv,
dataCoord: dc,
}
mkv.Test(t)
dc.Test(t)
meta.segmentsInfo.segmentIDMap[1] = &querypb.SegmentInfo{SegmentID: 1, DmChannel: ""}
mkv.On("Save", mock.Anything, mock.Anything).Return(nil)
dc.On("GetSegmentInfo", context.Background(), &datapb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
},
SegmentIDs: []UniqueID{1},
IncludeUnHealthy: true,
}).Return(&datapb.GetSegmentInfoResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Infos: []*datapb.SegmentInfo{
{ID: 2, InsertChannel: "dml_channel_01"},
},
}, nil)
err := meta.fixSegmentInfoDMChannel()
assert.Error(t, err)
})
t.Run("save patched segment failed", func(t *testing.T) {
mkv := &mockKV{}
dc := &mockDataCoord{}
meta := &MetaReplica{
ctx: context.Background(),
collectionInfos: map[UniqueID]*querypb.CollectionInfo{},
dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{},
segmentsInfo: newSegmentsInfo(mkv),
replicas: NewReplicaInfos(),
client: mkv,
dataCoord: dc,
}
mkv.Test(t)
dc.Test(t)
meta.segmentsInfo.segmentIDMap[1] = &querypb.SegmentInfo{SegmentID: 1, DmChannel: ""}
mkv.On("Save", mock.Anything, mock.Anything).Return(errors.New("mocked"))
dc.On("GetSegmentInfo", context.Background(), &datapb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
},
SegmentIDs: []UniqueID{1},
IncludeUnHealthy: true,
}).Return(&datapb.GetSegmentInfoResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Infos: []*datapb.SegmentInfo{
{ID: 1, InsertChannel: "dml_channel_01"},
},
}, nil)
err := meta.fixSegmentInfoDMChannel()
assert.Error(t, err)
})
}

View File

@ -165,6 +165,16 @@ func (qc *QueryCoord) Init() error {
log.Error("query coordinator init meta failed", zap.Error(initError))
return
}
meta, ok := qc.meta.(*MetaReplica)
if !ok {
panic("QueryCoord qc.meta assertion of MetaReplica error")
}
meta.dataCoord = qc.dataCoordClient
fixErr := meta.fixSegmentInfoDMChannel()
if fixErr != nil {
log.Error("QueryCoord newMeta fixSegmentInfoDMChannel failed", zap.Error(fixErr))
}
// init channelUnsubscribeHandler
qc.channelCleaner, initError = NewChannelCleaner(qc.loopCtx, qc.kvClient, qc.factory)