From 368686e149cfd5daef46ad109f5da25a6a4a4879 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 8 Dec 2022 11:19:19 +0800 Subject: [PATCH] Prevent flushWatcher watches new segment just after GetFlushedSegments (#21058) Signed-off-by: cai.zhang Signed-off-by: cai.zhang --- internal/indexcoord/flush_segment_watcher.go | 3 +++ internal/indexcoord/task.go | 8 +++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/internal/indexcoord/flush_segment_watcher.go b/internal/indexcoord/flush_segment_watcher.go index ac7ba5fcc0..a4cf85b784 100644 --- a/internal/indexcoord/flush_segment_watcher.go +++ b/internal/indexcoord/flush_segment_watcher.go @@ -275,6 +275,9 @@ func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) { } func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error { + // Make sure index is not being written. + fsw.ic.indexGCLock.Lock() + defer fsw.ic.indexGCLock.Unlock() fieldIndexes := fsw.meta.GetIndexesForCollection(t.segmentInfo.CollectionID, "") if len(fieldIndexes) == 0 { log.Ctx(fsw.ctx).Debug("segment no need to build index", zap.Int64("segmentID", t.segmentInfo.ID), diff --git a/internal/indexcoord/task.go b/internal/indexcoord/task.go index bf561b4ac1..86a93a2972 100644 --- a/internal/indexcoord/task.go +++ b/internal/indexcoord/task.go @@ -135,9 +135,6 @@ func (cit *CreateIndexTask) PreExecute(ctx context.Context) error { } func (cit *CreateIndexTask) createIndexAtomic(index *model.Index, segmentsInfo []*datapb.SegmentInfo) ([]UniqueID, []*datapb.SegmentInfo, error) { - cit.indexCoordClient.indexGCLock.RLock() - defer cit.indexCoordClient.indexGCLock.RUnlock() - buildIDs := make([]UniqueID, 0) segments := make([]*datapb.SegmentInfo, 0) for _, segmentInfo := range segmentsInfo { @@ -192,6 +189,11 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error { UserIndexParams: cit.req.GetUserIndexParams(), } + // lock before GetFlushedSegments, + // prevent the flush watcher watches the new flushed segment just after getting the flushed segments, and it locks firstly. + cit.indexCoordClient.indexGCLock.RLock() + defer cit.indexCoordClient.indexGCLock.RUnlock() + // Get flushed segments flushedSegments, err := cit.dataCoordClient.GetFlushedSegments(cit.ctx, &datapb.GetFlushedSegmentsRequest{ Base: commonpbutil.NewMsgBase(