mirror of https://github.com/milvus-io/milvus.git
Prevent flushWatcher watches new segment just after GetFlushedSegments (#21058)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com> Signed-off-by: cai.zhang <cai.zhang@zilliz.com>pull/21066/head
parent
d758a09490
commit
368686e149
|
@ -275,6 +275,9 @@ func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) {
|
|||
}
|
||||
|
||||
func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error {
|
||||
// Make sure index is not being written.
|
||||
fsw.ic.indexGCLock.Lock()
|
||||
defer fsw.ic.indexGCLock.Unlock()
|
||||
fieldIndexes := fsw.meta.GetIndexesForCollection(t.segmentInfo.CollectionID, "")
|
||||
if len(fieldIndexes) == 0 {
|
||||
log.Ctx(fsw.ctx).Debug("segment no need to build index", zap.Int64("segmentID", t.segmentInfo.ID),
|
||||
|
|
|
@ -135,9 +135,6 @@ func (cit *CreateIndexTask) PreExecute(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (cit *CreateIndexTask) createIndexAtomic(index *model.Index, segmentsInfo []*datapb.SegmentInfo) ([]UniqueID, []*datapb.SegmentInfo, error) {
|
||||
cit.indexCoordClient.indexGCLock.RLock()
|
||||
defer cit.indexCoordClient.indexGCLock.RUnlock()
|
||||
|
||||
buildIDs := make([]UniqueID, 0)
|
||||
segments := make([]*datapb.SegmentInfo, 0)
|
||||
for _, segmentInfo := range segmentsInfo {
|
||||
|
@ -192,6 +189,11 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error {
|
|||
UserIndexParams: cit.req.GetUserIndexParams(),
|
||||
}
|
||||
|
||||
// lock before GetFlushedSegments,
|
||||
// prevent the flush watcher watches the new flushed segment just after getting the flushed segments, and it locks firstly.
|
||||
cit.indexCoordClient.indexGCLock.RLock()
|
||||
defer cit.indexCoordClient.indexGCLock.RUnlock()
|
||||
|
||||
// Get flushed segments
|
||||
flushedSegments, err := cit.dataCoordClient.GetFlushedSegments(cit.ctx, &datapb.GetFlushedSegmentsRequest{
|
||||
Base: commonpbutil.NewMsgBase(
|
||||
|
|
Loading…
Reference in New Issue