mirror of https://github.com/milvus-io/milvus.git
Reduce log frequency in IndexCoord (#20638)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com> Signed-off-by: cai.zhang <cai.zhang@zilliz.com>pull/20658/head
parent
15ae68faa5
commit
38845d9669
|
@ -140,6 +140,8 @@ func (fsw *flushedSegmentWatcher) enqueueInternalTask(segment *datapb.SegmentInf
|
|||
}
|
||||
}
|
||||
}
|
||||
log.Ctx(fsw.ctx).Info("flushed segment task enqueue successfully", zap.Int64("segID", segment.GetID()),
|
||||
zap.Bool("isFake", segment.GetIsFake()))
|
||||
}
|
||||
|
||||
func (fsw *flushedSegmentWatcher) internalScheduler() {
|
||||
|
@ -229,13 +231,10 @@ func (fsw *flushedSegmentWatcher) setInternalTaskSegmentInfo(segID UniqueID, seg
|
|||
if _, ok := fsw.internalTasks[segID]; ok {
|
||||
fsw.internalTasks[segID].segmentInfo = segInfo
|
||||
}
|
||||
log.Ctx(fsw.ctx).Debug("flushedSegmentWatcher set internal task segment info success", zap.Int64("segID", segID))
|
||||
}
|
||||
|
||||
func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) {
|
||||
t := fsw.getInternalTask(segID)
|
||||
log.Ctx(fsw.ctx).RatedDebug(10, "flushedSegmentWatcher process internal task", zap.Int64("segID", segID),
|
||||
zap.String("state", t.state.String()))
|
||||
|
||||
switch t.state {
|
||||
case indexTaskPrepare:
|
||||
|
@ -247,13 +246,13 @@ func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) {
|
|||
}
|
||||
|
||||
if err := fsw.prepare(segID); err != nil {
|
||||
log.Ctx(fsw.ctx).RatedWarn(10, "flushedSegmentWatcher prepare internal task fail", zap.Int64("segID", segID), zap.Error(err))
|
||||
log.Ctx(fsw.ctx).Warn("flushedSegmentWatcher prepare internal task fail", zap.Int64("segID", segID), zap.Error(err))
|
||||
return
|
||||
}
|
||||
fsw.updateInternalTaskState(segID, indexTaskInit)
|
||||
case indexTaskInit:
|
||||
if err := fsw.constructTask(t); err != nil {
|
||||
log.Ctx(fsw.ctx).RatedWarn(10, "flushedSegmentWatcher construct task fail", zap.Int64("segID", segID), zap.Error(err))
|
||||
log.Ctx(fsw.ctx).Warn("flushedSegmentWatcher construct task fail", zap.Int64("segID", segID), zap.Error(err))
|
||||
return
|
||||
}
|
||||
fsw.updateInternalTaskState(segID, indexTaskInProgress)
|
||||
|
@ -265,8 +264,6 @@ func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) {
|
|||
}
|
||||
case indexTaskDone:
|
||||
if err := fsw.removeFlushedSegment(t); err != nil {
|
||||
log.Ctx(fsw.ctx).RatedWarn(10, "IndexCoord flushSegmentWatcher removeFlushedSegment fail",
|
||||
zap.Int64("segID", segID), zap.Error(err))
|
||||
return
|
||||
}
|
||||
fsw.deleteInternalTask(segID)
|
||||
|
@ -310,17 +307,15 @@ func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error {
|
|||
// send to indexBuilder
|
||||
have, buildID, err := fsw.ic.createIndexForSegment(segIdx)
|
||||
if err != nil {
|
||||
log.Ctx(fsw.ctx).Warn("IndexCoord create index for segment fail", zap.Int64("segID", t.segmentInfo.ID),
|
||||
zap.Int64("indexID", index.IndexID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if !have {
|
||||
fsw.builder.enqueue(buildID)
|
||||
}
|
||||
log.Ctx(fsw.ctx).Info("flushedSegmentWatcher construct task success", zap.Int64("segID", t.segmentInfo.ID),
|
||||
zap.Int64("buildID", buildID), zap.Bool("already have index task", have))
|
||||
}
|
||||
fsw.handoff.enqueue(t.segmentInfo)
|
||||
log.Ctx(fsw.ctx).Debug("flushedSegmentWatcher construct task success", zap.Int64("segID", t.segmentInfo.ID),
|
||||
zap.Int("segments num", len(fieldIndexes)))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -328,25 +323,24 @@ func (fsw *flushedSegmentWatcher) removeFlushedSegment(t *internalTask) error {
|
|||
deletedKeys := fmt.Sprintf("%s/%d/%d/%d", util.FlushedSegmentPrefix, t.segmentInfo.CollectionID, t.segmentInfo.PartitionID, t.segmentInfo.ID)
|
||||
err := fsw.kvClient.RemoveWithPrefix(deletedKeys)
|
||||
if err != nil {
|
||||
log.Ctx(fsw.ctx).Warn("IndexCoord remove flushed segment fail", zap.Int64("collID", t.segmentInfo.CollectionID),
|
||||
log.Ctx(fsw.ctx).Warn("IndexCoord remove flushed segment key fail", zap.Int64("collID", t.segmentInfo.CollectionID),
|
||||
zap.Int64("partID", t.segmentInfo.PartitionID), zap.Int64("segID", t.segmentInfo.ID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Ctx(fsw.ctx).Info("IndexCoord remove flushed segment success", zap.Int64("collID", t.segmentInfo.CollectionID),
|
||||
log.Ctx(fsw.ctx).Info("IndexCoord remove flushed segment key success", zap.Int64("collID", t.segmentInfo.CollectionID),
|
||||
zap.Int64("partID", t.segmentInfo.PartitionID), zap.Int64("segID", t.segmentInfo.ID))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fsw *flushedSegmentWatcher) prepare(segID UniqueID) error {
|
||||
defer fsw.internalNotifyFunc()
|
||||
log.Debug("prepare flushed segment task", zap.Int64("segID", segID))
|
||||
t := fsw.getInternalTask(segID)
|
||||
if t.segmentInfo != nil {
|
||||
return nil
|
||||
}
|
||||
info, err := fsw.ic.pullSegmentInfo(fsw.ctx, segID)
|
||||
if err != nil {
|
||||
log.Error("flushedSegmentWatcher get segment info fail", zap.Int64("segID", segID),
|
||||
log.Warn("flushedSegmentWatcher get segment info fail", zap.Int64("segID", segID),
|
||||
zap.Error(err))
|
||||
if errors.Is(err, ErrSegmentNotFound) {
|
||||
fsw.deleteInternalTask(segID)
|
||||
|
@ -355,5 +349,6 @@ func (fsw *flushedSegmentWatcher) prepare(segID UniqueID) error {
|
|||
return err
|
||||
}
|
||||
fsw.setInternalTaskSegmentInfo(segID, info)
|
||||
log.Ctx(fsw.ctx).Info("flushedSegmentWatcher prepare task success", zap.Int64("segID", segID))
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -95,7 +95,7 @@ func (hd *handoff) enqueue(segment *datapb.SegmentInfo) {
|
|||
|
||||
// note: don't reset state if the task contains state
|
||||
hd.segments[segment.GetID()] = segment
|
||||
log.Ctx(hd.ctx).Info("segment need to write handoff",
|
||||
log.Ctx(hd.ctx).Info("handoff task enqueue successfully",
|
||||
zap.Int64("segID", segment.GetID()),
|
||||
zap.Bool("isFake", segment.GetIsFake()),
|
||||
)
|
||||
|
@ -151,13 +151,13 @@ func (hd *handoff) run() {
|
|||
if len(segIDs) > 0 {
|
||||
log.Ctx(hd.ctx).Debug("handoff process...", zap.Int("task num", len(segIDs)))
|
||||
}
|
||||
for i, segID := range segIDs {
|
||||
hd.process(segID, i == 0)
|
||||
for _, segID := range segIDs {
|
||||
hd.process(segID)
|
||||
}
|
||||
}
|
||||
|
||||
func (hd *handoff) handoffFakedSegment(segment *datapb.SegmentInfo, front bool) {
|
||||
if front || hd.allParentsDone(segment.GetCompactionFrom()) {
|
||||
func (hd *handoff) handoffFakedSegment(segment *datapb.SegmentInfo) {
|
||||
if hd.allParentsDone(segment.GetCompactionFrom()) {
|
||||
handoffSegment := &querypb.SegmentInfo{
|
||||
SegmentID: segment.GetID(),
|
||||
CollectionID: segment.GetCollectionID(),
|
||||
|
@ -180,24 +180,24 @@ func (hd *handoff) handoffFakedSegment(segment *datapb.SegmentInfo, front bool)
|
|||
}
|
||||
}
|
||||
|
||||
func (hd *handoff) process(segID UniqueID, front bool) {
|
||||
func (hd *handoff) process(segID UniqueID) {
|
||||
hd.taskMutex.RLock()
|
||||
segment, ok := hd.segments[segID]
|
||||
hd.taskMutex.RUnlock()
|
||||
|
||||
if !ok {
|
||||
log.Ctx(hd.ctx).Warn("handoff get segment fail", zap.Int64("segID", segID))
|
||||
log.Ctx(hd.ctx).Warn("handoff get task fail", zap.Int64("segID", segID))
|
||||
return
|
||||
}
|
||||
|
||||
if segment.GetIsFake() {
|
||||
hd.handoffFakedSegment(segment, front)
|
||||
hd.handoffFakedSegment(segment)
|
||||
return
|
||||
}
|
||||
|
||||
state := hd.meta.GetSegmentIndexState(segID)
|
||||
log.Ctx(hd.ctx).RatedDebug(30, "handoff task is process", zap.Int64("segID", segID),
|
||||
zap.Bool("front", front), zap.String("state", state.state.String()))
|
||||
zap.String("state", state.state.String()))
|
||||
if state.state == commonpb.IndexState_Failed {
|
||||
log.Ctx(hd.ctx).Error("build index failed, may be need manual intervention", zap.Int64("segID", segID),
|
||||
zap.String("fail reason", state.failReason))
|
||||
|
@ -210,7 +210,7 @@ func (hd *handoff) process(segID UniqueID, front bool) {
|
|||
info, err := hd.ic.pullSegmentInfo(hd.ctx, segID)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrSegmentNotFound) {
|
||||
log.Ctx(hd.ctx).Error("handoff get segment fail", zap.Error(err))
|
||||
log.Ctx(hd.ctx).Warn("handoff get segment fail, remove task", zap.Error(err))
|
||||
hd.deleteTask(segID)
|
||||
return
|
||||
}
|
||||
|
@ -221,12 +221,13 @@ func (hd *handoff) process(segID UniqueID, front bool) {
|
|||
log.Debug("segment is importing, can't write handoff event", zap.Int64("segID", segID))
|
||||
return
|
||||
}
|
||||
if front || hd.allParentsDone(info.CompactionFrom) {
|
||||
log.Ctx(hd.ctx).Debug("segment can write handoff event", zap.Int64("segID", segID), zap.Bool("front", front),
|
||||
if hd.allParentsDone(info.CompactionFrom) {
|
||||
log.Ctx(hd.ctx).Debug("segment can write handoff event", zap.Int64("segID", segID),
|
||||
zap.Int64s("compactionFrom", info.CompactionFrom))
|
||||
indexInfos := hd.meta.GetSegmentIndexes(segID)
|
||||
if len(indexInfos) == 0 {
|
||||
log.Ctx(hd.ctx).Warn("ready to write handoff, but there is no index, may be dropped", zap.Int64("segID", segID))
|
||||
log.Ctx(hd.ctx).Warn("ready to write handoff, but there is no index, may be dropped, remove task",
|
||||
zap.Int64("segID", segID))
|
||||
hd.deleteTask(segID)
|
||||
return
|
||||
}
|
||||
|
@ -255,12 +256,12 @@ func (hd *handoff) process(segID UniqueID, front bool) {
|
|||
})
|
||||
}
|
||||
|
||||
log.Ctx(hd.ctx).Info("write handoff task success", zap.Int64("segID", segID))
|
||||
if !hd.meta.AlreadyWrittenHandoff(segID) {
|
||||
if err := hd.writeHandoffSegment(handoffTask); err != nil {
|
||||
log.Ctx(hd.ctx).Warn("write handoff task fail, need to retry", zap.Int64("segID", segID), zap.Error(err))
|
||||
return
|
||||
}
|
||||
log.Ctx(hd.ctx).Info("write handoff success", zap.Int64("segID", segID))
|
||||
if err := hd.meta.MarkSegmentWriteHandoff(segID); err != nil {
|
||||
log.Ctx(hd.ctx).Warn("mark segment as write handoff fail, need to retry", zap.Int64("segID", segID), zap.Error(err))
|
||||
return
|
||||
|
@ -271,6 +272,8 @@ func (hd *handoff) process(segID UniqueID, front bool) {
|
|||
hd.deleteTask(segID)
|
||||
return
|
||||
}
|
||||
log.Ctx(hd.ctx).RatedDebug(5, "the handoff of the parent segment has not been written yet",
|
||||
zap.Int64("segID", segID), zap.Int64s("compactionFrom", info.CompactionFrom))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -177,7 +177,7 @@ func Test_newHandoff(t *testing.T) {
|
|||
func Test_process(t *testing.T) {
|
||||
t.Run("not found segment", func(t *testing.T) {
|
||||
hd := &handoff{segments: map[UniqueID]*datapb.SegmentInfo{}}
|
||||
hd.process(segID, true)
|
||||
hd.process(segID)
|
||||
assert.Equal(t, 0, hd.Len())
|
||||
})
|
||||
|
||||
|
@ -197,7 +197,7 @@ func Test_process(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
hd.process(segID, true)
|
||||
hd.process(segID)
|
||||
assert.Equal(t, 0, hd.Len())
|
||||
})
|
||||
}
|
||||
|
@ -240,7 +240,7 @@ func Test_handoff_error(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
hd.process(segID, true)
|
||||
hd.process(segID)
|
||||
assert.Equal(t, 1, hd.Len())
|
||||
|
||||
hd.ic.dataCoordClient = &DataCoordMock{
|
||||
|
@ -248,7 +248,7 @@ func Test_handoff_error(t *testing.T) {
|
|||
return nil, errSegmentNotFound(segID)
|
||||
},
|
||||
}
|
||||
hd.process(segID, true)
|
||||
hd.process(segID)
|
||||
assert.Equal(t, 0, hd.Len())
|
||||
})
|
||||
|
||||
|
@ -305,7 +305,7 @@ func Test_handoff_error(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
hd.process(segID, true)
|
||||
hd.process(segID)
|
||||
assert.Equal(t, 1, hd.Len())
|
||||
})
|
||||
|
||||
|
@ -343,7 +343,7 @@ func Test_handoff_error(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
hd.process(segID, true)
|
||||
hd.process(segID)
|
||||
assert.Equal(t, 0, hd.Len())
|
||||
})
|
||||
|
||||
|
@ -386,7 +386,7 @@ func Test_handoff_error(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
hd.process(segID, true)
|
||||
hd.process(segID)
|
||||
assert.Equal(t, 1, hd.Len())
|
||||
})
|
||||
|
||||
|
@ -407,7 +407,7 @@ func Test_handoff_error(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
hd.process(segID, true)
|
||||
hd.process(segID)
|
||||
assert.Equal(t, 1, hd.Len())
|
||||
})
|
||||
|
||||
|
@ -450,7 +450,7 @@ func Test_handoff_error(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
hd.process(segID, true)
|
||||
hd.process(segID)
|
||||
assert.Equal(t, 1, hd.Len())
|
||||
})
|
||||
}
|
||||
|
|
|
@ -132,6 +132,7 @@ func (ib *indexBuilder) enqueue(buildID UniqueID) {
|
|||
if _, ok := ib.tasks[buildID]; !ok {
|
||||
ib.tasks[buildID] = indexTaskInit
|
||||
}
|
||||
log.Info("indexBuilder enqueue task", zap.Int64("buildID", buildID))
|
||||
}
|
||||
|
||||
func (ib *indexBuilder) schedule() {
|
||||
|
@ -174,7 +175,7 @@ func (ib *indexBuilder) run() {
|
|||
for _, buildID := range buildIDs {
|
||||
ok := ib.process(buildID)
|
||||
if !ok {
|
||||
log.Ctx(ib.ctx).Debug("there is no IndexNode available or etcd is not serviceable, wait a minute...")
|
||||
log.Ctx(ib.ctx).Info("there is no IndexNode available or etcd is not serviceable, wait a minute...")
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -197,10 +198,9 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
|
|||
delete(ib.tasks, buildID)
|
||||
}
|
||||
|
||||
log.Ctx(ib.ctx).RatedDebug(10, "index task is processing", zap.Int64("buildID", buildID), zap.String("task state", state.String()))
|
||||
meta, exist := ib.meta.GetMeta(buildID)
|
||||
if !exist {
|
||||
log.Ctx(ib.ctx).RatedDebug(10, "index task has not exist in meta table, remove task", zap.Int64("buildID", buildID))
|
||||
log.Ctx(ib.ctx).RatedDebug(5, "index task has not exist in meta table, remove task", zap.Int64("buildID", buildID))
|
||||
deleteFunc(buildID)
|
||||
return true
|
||||
}
|
||||
|
@ -208,7 +208,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
|
|||
switch state {
|
||||
case indexTaskInit:
|
||||
if !ib.meta.NeedIndex(meta.CollectionID, meta.IndexID) {
|
||||
log.Ctx(ib.ctx).Debug("task is no need to build index, remove it", zap.Int64("buildID", buildID))
|
||||
log.Ctx(ib.ctx).RatedDebug(5, "task is no need to build index, remove it", zap.Int64("buildID", buildID))
|
||||
deleteFunc(buildID)
|
||||
return true
|
||||
}
|
||||
|
@ -223,7 +223,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
|
|||
SerializedSize: 0,
|
||||
FailReason: "",
|
||||
}); err != nil {
|
||||
log.Ctx(ib.ctx).RatedWarn(10, "IndexCoord update index state fail", zap.Int64("buildID", buildID), zap.Error(err))
|
||||
log.Ctx(ib.ctx).Warn("IndexCoord update index state fail", zap.Int64("buildID", buildID), zap.Error(err))
|
||||
return false
|
||||
}
|
||||
updateStateFunc(buildID, indexTaskDone)
|
||||
|
@ -233,25 +233,25 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
|
|||
// if all IndexNodes are executing task, wait for one of them to finish the task.
|
||||
nodeID, client := ib.ic.nodeManager.PeekClient(meta)
|
||||
if client == nil {
|
||||
log.Ctx(ib.ctx).RatedDebug(10, "index builder peek client error, there is no available")
|
||||
log.Ctx(ib.ctx).RatedInfo(5, "index builder peek client error, there is no available")
|
||||
return false
|
||||
}
|
||||
// update version and set nodeID
|
||||
if err := ib.meta.UpdateVersion(buildID, nodeID); err != nil {
|
||||
log.Ctx(ib.ctx).RatedWarn(10, "index builder update index version failed", zap.Int64("build", buildID), zap.Error(err))
|
||||
log.Ctx(ib.ctx).Warn("index builder update index version failed", zap.Int64("build", buildID), zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
// acquire lock
|
||||
if err := ib.ic.tryAcquireSegmentReferLock(ib.ctx, buildID, nodeID, []UniqueID{meta.SegmentID}); err != nil {
|
||||
log.Ctx(ib.ctx).RatedWarn(10, "index builder acquire segment reference lock failed", zap.Int64("buildID", buildID),
|
||||
log.Ctx(ib.ctx).Warn("index builder acquire segment reference lock failed", zap.Int64("buildID", buildID),
|
||||
zap.Int64("nodeID", nodeID), zap.Error(err))
|
||||
updateStateFunc(buildID, indexTaskRetry)
|
||||
return false
|
||||
}
|
||||
info, err := ib.ic.pullSegmentInfo(ib.ctx, meta.SegmentID)
|
||||
if err != nil {
|
||||
log.Ctx(ib.ctx).RatedWarn(10, "IndexCoord get segment info from DataCoord fail", zap.Int64("segID", meta.SegmentID),
|
||||
log.Ctx(ib.ctx).Warn("IndexCoord get segment info from DataCoord fail", zap.Int64("segID", meta.SegmentID),
|
||||
zap.Int64("buildID", buildID), zap.Error(err))
|
||||
if errors.Is(err, ErrSegmentNotFound) {
|
||||
updateStateFunc(buildID, indexTaskDeleted)
|
||||
|
@ -303,7 +303,6 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
|
|||
TypeParams: typeParams,
|
||||
NumRows: meta.NumRows,
|
||||
}
|
||||
log.Ctx(ib.ctx).RatedDebug(10, "assign task to indexNode", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID))
|
||||
if err := ib.ic.assignTask(client, req); err != nil {
|
||||
// need to release lock then reassign, so set task state to retry
|
||||
log.Ctx(ib.ctx).RatedWarn(10, "index builder assign task to IndexNode failed", zap.Int64("buildID", buildID),
|
||||
|
@ -311,21 +310,22 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
|
|||
updateStateFunc(buildID, indexTaskRetry)
|
||||
return false
|
||||
}
|
||||
log.Ctx(ib.ctx).Info("index task assigned successfully", zap.Int64("buildID", buildID),
|
||||
zap.Int64("segID", meta.SegmentID), zap.Int64("nodeID", nodeID))
|
||||
// update index meta state to InProgress
|
||||
if err := ib.meta.BuildIndex(buildID); err != nil {
|
||||
// need to release lock then reassign, so set task state to retry
|
||||
log.Ctx(ib.ctx).RatedWarn(10, "index builder update index meta to InProgress failed", zap.Int64("buildID", buildID),
|
||||
log.Ctx(ib.ctx).Warn("index builder update index meta to InProgress failed", zap.Int64("buildID", buildID),
|
||||
zap.Int64("nodeID", nodeID), zap.Error(err))
|
||||
updateStateFunc(buildID, indexTaskRetry)
|
||||
return false
|
||||
}
|
||||
log.Ctx(ib.ctx).RatedDebug(10, "index task assigned success", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID))
|
||||
updateStateFunc(buildID, indexTaskInProgress)
|
||||
|
||||
case indexTaskDone:
|
||||
log.Ctx(ib.ctx).RatedDebug(10, "index task has done", zap.Int64("buildID", buildID))
|
||||
if !ib.meta.NeedIndex(meta.CollectionID, meta.IndexID) {
|
||||
log.Ctx(ib.ctx).Debug("task is no need to build index, remove it", zap.Int64("buildID", buildID))
|
||||
log.Ctx(ib.ctx).Info("task is no need to build index, remove it", zap.Int64("buildID", buildID),
|
||||
zap.Int64("segID", meta.SegmentID))
|
||||
updateStateFunc(buildID, indexTaskDeleted)
|
||||
return true
|
||||
}
|
||||
|
@ -335,30 +335,26 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
|
|||
}
|
||||
if err := ib.releaseLockAndResetNode(buildID, meta.NodeID); err != nil {
|
||||
// release lock failed, no need to modify state, wait to retry
|
||||
log.Ctx(ib.ctx).RatedWarn(10, "index builder try to release reference lock failed", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
deleteFunc(buildID)
|
||||
case indexTaskRetry:
|
||||
if !ib.meta.NeedIndex(meta.CollectionID, meta.IndexID) {
|
||||
log.Ctx(ib.ctx).Debug("task is no need to build index, remove it", zap.Int64("buildID", buildID))
|
||||
log.Ctx(ib.ctx).Info("task is no need to build index, remove it", zap.Int64("buildID", buildID))
|
||||
updateStateFunc(buildID, indexTaskDeleted)
|
||||
return true
|
||||
}
|
||||
if !ib.dropIndexTask(buildID, meta.NodeID) {
|
||||
log.Ctx(ib.ctx).RatedWarn(5, "drop index task fail, need retry")
|
||||
return true
|
||||
}
|
||||
if err := ib.releaseLockAndResetTask(buildID, meta.NodeID); err != nil {
|
||||
// release lock failed, no need to modify state, wait to retry
|
||||
log.Ctx(ib.ctx).RatedWarn(10, "index builder try to release reference lock failed", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
updateStateFunc(buildID, indexTaskInit)
|
||||
|
||||
case indexTaskDeleted:
|
||||
log.Ctx(ib.ctx).Debug("index task state is deleted, try to release reference lock", zap.Int64("buildID", buildID))
|
||||
// TODO: delete after QueryCoordV2
|
||||
if err := ib.meta.MarkSegmentsIndexAsDeletedByBuildID([]int64{buildID}); err != nil {
|
||||
return false
|
||||
}
|
||||
|
@ -370,7 +366,6 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
|
|||
}
|
||||
if err := ib.releaseLockAndResetNode(buildID, meta.NodeID); err != nil {
|
||||
// release lock failed, no need to modify state, wait to retry
|
||||
log.Ctx(ib.ctx).RatedWarn(10, "index builder try to release reference lock failed", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@ -378,8 +373,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
|
|||
deleteFunc(buildID)
|
||||
|
||||
default:
|
||||
log.Ctx(ib.ctx).Debug("index task is in progress", zap.Int64("buildID", buildID),
|
||||
zap.String("state", meta.IndexState.String()))
|
||||
// state: in_progress
|
||||
if !ib.meta.NeedIndex(meta.CollectionID, meta.IndexID) {
|
||||
log.Ctx(ib.ctx).Debug("task is no need to build index, remove it", zap.Int64("buildID", buildID))
|
||||
updateStateFunc(buildID, indexTaskDeleted)
|
||||
|
@ -391,7 +385,6 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
|
|||
}
|
||||
|
||||
func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState {
|
||||
log.Ctx(ib.ctx).Info("IndexCoord indexBuilder get index task state", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID))
|
||||
client, exist := ib.ic.nodeManager.GetClientByID(nodeID)
|
||||
if exist {
|
||||
ctx1, cancel := context.WithTimeout(ib.ctx, reqTimeoutInterval)
|
||||
|
@ -401,12 +394,12 @@ func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState {
|
|||
BuildIDs: []int64{buildID},
|
||||
})
|
||||
if err != nil {
|
||||
log.Ctx(ib.ctx).Error("IndexCoord get jobs info from IndexNode fail", zap.Int64("nodeID", nodeID),
|
||||
log.Ctx(ib.ctx).Warn("IndexCoord get jobs info from IndexNode fail", zap.Int64("nodeID", nodeID),
|
||||
zap.Error(err))
|
||||
return indexTaskInProgress
|
||||
}
|
||||
if response.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
log.Ctx(ib.ctx).Error("IndexCoord get jobs info from IndexNode fail", zap.Int64("nodeID", nodeID),
|
||||
log.Ctx(ib.ctx).Warn("IndexCoord get jobs info from IndexNode fail", zap.Int64("nodeID", nodeID),
|
||||
zap.Int64("buildID", buildID), zap.String("fail reason", response.Status.Reason))
|
||||
return indexTaskInProgress
|
||||
}
|
||||
|
@ -418,7 +411,7 @@ func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState {
|
|||
log.Ctx(ib.ctx).Info("this task has been finished", zap.Int64("buildID", info.BuildID),
|
||||
zap.String("index state", info.State.String()))
|
||||
if err := ib.meta.FinishTask(info); err != nil {
|
||||
log.Ctx(ib.ctx).Error("IndexCoord update index state fail", zap.Int64("buildID", info.BuildID),
|
||||
log.Ctx(ib.ctx).Warn("IndexCoord update index state fail", zap.Int64("buildID", info.BuildID),
|
||||
zap.String("index state", info.State.String()), zap.Error(err))
|
||||
return indexTaskInProgress
|
||||
}
|
||||
|
@ -435,11 +428,12 @@ func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState {
|
|||
return indexTaskRetry
|
||||
}
|
||||
// !exist --> node down
|
||||
log.Ctx(ib.ctx).Info("this task should be retry, indexNode is no longer exist", zap.Int64("buildID", buildID),
|
||||
zap.Int64("nodeID", nodeID))
|
||||
return indexTaskRetry
|
||||
}
|
||||
|
||||
func (ib *indexBuilder) dropIndexTask(buildID, nodeID UniqueID) bool {
|
||||
log.Ctx(ib.ctx).Info("IndexCoord notify IndexNode drop the index task", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID))
|
||||
client, exist := ib.ic.nodeManager.GetClientByID(nodeID)
|
||||
if exist {
|
||||
ctx1, cancel := context.WithTimeout(ib.ctx, reqTimeoutInterval)
|
||||
|
@ -458,21 +452,24 @@ func (ib *indexBuilder) dropIndexTask(buildID, nodeID UniqueID) bool {
|
|||
zap.Int64("nodeID", nodeID), zap.String("fail reason", status.Reason))
|
||||
return false
|
||||
}
|
||||
log.Ctx(ib.ctx).Info("IndexCoord notify IndexNode drop the index task success",
|
||||
zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID))
|
||||
return true
|
||||
}
|
||||
log.Ctx(ib.ctx).Info("IndexNode no longer exist, no need to drop index task",
|
||||
zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID))
|
||||
return true
|
||||
}
|
||||
|
||||
func (ib *indexBuilder) releaseLockAndResetNode(buildID UniqueID, nodeID UniqueID) error {
|
||||
log.Ctx(ib.ctx).Info("release segment reference lock and reset nodeID", zap.Int64("buildID", buildID),
|
||||
zap.Int64("nodeID", nodeID))
|
||||
if err := ib.ic.tryReleaseSegmentReferLock(ib.ctx, buildID, nodeID); err != nil {
|
||||
// release lock failed, no need to modify state, wait to retry
|
||||
log.Ctx(ib.ctx).Error("index builder try to release reference lock failed", zap.Error(err))
|
||||
log.Ctx(ib.ctx).Warn("index builder try to release reference lock failed", zap.Int64("buildID", buildID),
|
||||
zap.Int64("nodeID", nodeID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if err := ib.meta.ResetNodeID(buildID); err != nil {
|
||||
log.Ctx(ib.ctx).Error("index builder try to reset nodeID failed", zap.Error(err))
|
||||
log.Ctx(ib.ctx).Warn("index builder try to reset nodeID failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Ctx(ib.ctx).Info("release segment reference lock and reset nodeID success", zap.Int64("buildID", buildID),
|
||||
|
@ -481,17 +478,15 @@ func (ib *indexBuilder) releaseLockAndResetNode(buildID UniqueID, nodeID UniqueI
|
|||
}
|
||||
|
||||
func (ib *indexBuilder) releaseLockAndResetTask(buildID UniqueID, nodeID UniqueID) error {
|
||||
log.Ctx(ib.ctx).Info("release segment reference lock and reset task", zap.Int64("buildID", buildID),
|
||||
zap.Int64("nodeID", nodeID))
|
||||
if nodeID != 0 {
|
||||
if err := ib.ic.tryReleaseSegmentReferLock(ib.ctx, buildID, nodeID); err != nil {
|
||||
// release lock failed, no need to modify state, wait to retry
|
||||
log.Ctx(ib.ctx).Error("index builder try to release reference lock failed", zap.Error(err))
|
||||
log.Ctx(ib.ctx).Warn("index builder try to release reference lock failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := ib.meta.ResetMeta(buildID); err != nil {
|
||||
log.Ctx(ib.ctx).Error("index builder try to reset task failed", zap.Error(err))
|
||||
log.Ctx(ib.ctx).Warn("index builder try to reset task failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Ctx(ib.ctx).Info("release segment reference lock and reset task success", zap.Int64("buildID", buildID),
|
||||
|
|
|
@ -1287,11 +1287,11 @@ func (i *IndexCoord) pullSegmentInfo(ctx context.Context, segmentID UniqueID) (*
|
|||
IncludeUnHealthy: false,
|
||||
})
|
||||
if err != nil {
|
||||
log.Error("IndexCoord get segment info fail", zap.Int64("segID", segmentID), zap.Error(err))
|
||||
log.Warn("IndexCoord get segment info fail", zap.Int64("segID", segmentID), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
if resp.Status.GetErrorCode() != commonpb.ErrorCode_Success {
|
||||
log.Error("IndexCoord get segment info fail", zap.Int64("segID", segmentID),
|
||||
log.Warn("IndexCoord get segment info fail", zap.Int64("segID", segmentID),
|
||||
zap.String("fail reason", resp.Status.GetReason()))
|
||||
if resp.Status.GetReason() == msgSegmentNotFound(segmentID) {
|
||||
return nil, errSegmentNotFound(segmentID)
|
||||
|
@ -1304,6 +1304,6 @@ func (i *IndexCoord) pullSegmentInfo(ctx context.Context, segmentID UniqueID) (*
|
|||
}
|
||||
}
|
||||
errMsg := msgSegmentNotFound(segmentID)
|
||||
log.Error(errMsg)
|
||||
log.Warn(errMsg)
|
||||
return nil, errSegmentNotFound(segmentID)
|
||||
}
|
||||
|
|
|
@ -1038,6 +1038,8 @@ func (mt *metaTable) FinishTask(taskInfo *indexpb.IndexTaskInfo) error {
|
|||
}
|
||||
|
||||
mt.updateIndexTasksMetrics()
|
||||
log.Info("finish index task success", zap.Int64("buildID", taskInfo.BuildID),
|
||||
zap.String("state", taskInfo.GetState().String()), zap.String("fail reason", taskInfo.GetFailReason()))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue