Acquire reference lock of compacted from segments during index building (#17619)

- Accquire reference lock during index building
- Keep dropped segment record

Co-authored-by: Letian Jiang <letian.jiang@zilliz.com>
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

Co-authored-by: Congqi Xia <congqi.xia@zilliz.com>
pull/17229/head
Letian Jiang 2022-06-18 01:24:11 +08:00 committed by GitHub
parent 7c69f4b338
commit b15c24a554
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 70 additions and 4 deletions

View File

@ -865,7 +865,7 @@ func (m *meta) CompleteMergeCompaction(compactionLogs []*datapb.CompactionSegmen
}
for _, s := range segments {
m.segments.DropSegment(s.GetID())
m.segments.SetSegment(s.GetID(), s)
}
// Handle empty segment generated by merge-compaction

View File

@ -902,7 +902,7 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error {
c.CallRemoveIndexService = func(ctx context.Context, buildIDs []UniqueID) (retErr error) {
defer func() {
if err := recover(); err != nil {
retErr = fmt.Errorf("get index state from index service panic, msg = %v", err)
retErr = fmt.Errorf("remove index from index service panic, msg = %v", err)
}
}()
<-initCh
@ -2242,7 +2242,7 @@ func (c *Core) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.In
}
// SegmentFlushCompleted check whether segment flush has completed
func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (status *commonpb.Status, err error) {
if code, ok := c.checkHealthy(); !ok {
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
}
@ -2252,7 +2252,23 @@ func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlus
log.Info("SegmentFlushCompleted received", zap.Int64("msgID", in.Base.MsgID), zap.Int64("collID", in.Segment.CollectionID),
zap.Int64("partID", in.Segment.PartitionID), zap.Int64("segID", in.Segment.ID), zap.Int64s("compactFrom", in.Segment.CompactionFrom))
err := c.createIndexForSegment(ctx, in.Segment.CollectionID, in.Segment.PartitionID, in.Segment.ID, in.Segment.NumOfRows, in.Segment.Binlogs)
// acquire reference lock before building index
if in.Segment.CreatedByCompaction {
log.Debug("try to acquire segment reference lock", zap.Int64("task id", in.Base.MsgID), zap.Int64s("segmentIDs", in.Segment.CompactionFrom))
if err := c.CallAddSegRefLock(ctx, in.Base.MsgID, in.Segment.CompactionFrom); err != nil {
log.Warn("acquire segment reference lock failed", zap.Int64("task id", in.Base.MsgID), zap.Int64s("segmentIDs", in.Segment.CompactionFrom))
return failStatus(commonpb.ErrorCode_UnexpectedError, "AcquireSegRefLock failed: "+err.Error()), nil
}
defer func() {
if err := c.CallReleaseSegRefLock(ctx, in.Base.MsgID, in.Segment.CompactionFrom); err != nil {
log.Warn("release segment reference lock failed", zap.Int64("task id", in.Base.MsgID), zap.Int64s("segmentIDs", in.Segment.CompactionFrom))
// panic to let ref manager detect release failure
panic(err)
}
}()
}
err = c.createIndexForSegment(ctx, in.Segment.CollectionID, in.Segment.PartitionID, in.Segment.ID, in.Segment.NumOfRows, in.Segment.Binlogs)
if err != nil {
log.Error("createIndexForSegment", zap.Int64("msgID", in.Base.MsgID), zap.Int64("collID", in.Segment.CollectionID),
zap.Int64("partID", in.Segment.PartitionID), zap.Int64("segID", in.Segment.ID), zap.Error(err))
@ -2275,6 +2291,7 @@ func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlus
zap.Int64s("compactFrom", in.Segment.CompactionFrom), zap.Error(err))
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
}
log.Debug("SegmentFlushCompleted success", zap.String("role", typeutil.RootCoordRole),
zap.Int64("collection id", in.Segment.CollectionID), zap.Int64("partition id", in.Segment.PartitionID),
zap.Int64("segment id", in.Segment.ID), zap.Int64("msgID", in.Base.MsgID))

View File

@ -381,6 +381,15 @@ func (idx *indexMock) DropIndex(ctx context.Context, req *indexpb.DropIndexReque
}, nil
}
func (idx *indexMock) RemoveIndex(ctx context.Context, req *indexpb.RemoveIndexRequest) (*commonpb.Status, error) {
idx.mutex.Lock()
defer idx.mutex.Unlock()
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
}, nil
}
func (idx *indexMock) getFileArray() []string {
idx.mutex.Lock()
defer idx.mutex.Unlock()
@ -1443,6 +1452,46 @@ func TestRootCoord_Base(t *testing.T) {
assert.Equal(t, Params.CommonCfg.DefaultIndexName, rsp.IndexDescriptions[0].IndexName)
})
t.Run("flush segment from compaction", func(t *testing.T) {
coll, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.NoError(t, err)
partID := coll.PartitionIDs[1]
flushMsg := datapb.SegmentFlushCompletedMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentFlushDone,
},
Segment: &datapb.SegmentInfo{
ID: segID + 1,
CollectionID: coll.ID,
PartitionID: partID,
CompactionFrom: []int64{segID},
CreatedByCompaction: true,
},
}
st, err := core.SegmentFlushCompleted(ctx, &flushMsg)
assert.NoError(t, err)
assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_Success)
req := &milvuspb.DescribeIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeIndex,
MsgID: 210,
Timestamp: 210,
SourceID: 210,
},
DbName: "",
CollectionName: collName,
FieldName: "vector",
IndexName: "",
}
rsp, err := core.DescribeIndex(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode)
assert.Equal(t, 1, len(rsp.IndexDescriptions))
assert.Equal(t, Params.CommonCfg.DefaultIndexName, rsp.IndexDescriptions[0].IndexName)
})
wg.Add(1)
t.Run("import", func(t *testing.T) {
defer wg.Done()