mirror of https://github.com/milvus-io/milvus.git
enhance: modify log to avoid ambiguity and improve readability (#28331)
Signed-off-by: yah01 <yah2er0ne@outlook.com>pull/28233/head
parent
3920bbc55f
commit
70995383bf
internal/querycoordv2/task
|
@ -551,10 +551,10 @@ func (scheduler *taskScheduler) schedule(node int64) {
|
||||||
|
|
||||||
// The scheduler doesn't limit the number of tasks,
|
// The scheduler doesn't limit the number of tasks,
|
||||||
// to commit tasks to executors as soon as possible, to reach higher merge possibility
|
// to commit tasks to executors as soon as possible, to reach higher merge possibility
|
||||||
failCount := atomic.NewInt32(0)
|
commmittedNum := atomic.NewInt32(0)
|
||||||
funcutil.ProcessFuncParallel(len(toProcess), hardware.GetCPUNum(), func(idx int) error {
|
funcutil.ProcessFuncParallel(len(toProcess), hardware.GetCPUNum(), func(idx int) error {
|
||||||
if !scheduler.process(toProcess[idx]) {
|
if scheduler.process(toProcess[idx]) {
|
||||||
failCount.Inc()
|
commmittedNum.Inc()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}, "process")
|
}, "process")
|
||||||
|
@ -565,7 +565,7 @@ func (scheduler *taskScheduler) schedule(node int64) {
|
||||||
|
|
||||||
log.Info("processed tasks",
|
log.Info("processed tasks",
|
||||||
zap.Int("toProcessNum", len(toProcess)),
|
zap.Int("toProcessNum", len(toProcess)),
|
||||||
zap.Int32("failCount", failCount.Load()),
|
zap.Int32("committedNum", commmittedNum.Load()),
|
||||||
zap.Int("toRemoveNum", len(toRemove)),
|
zap.Int("toRemoveNum", len(toRemove)),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -689,7 +689,7 @@ func (scheduler *taskScheduler) recordSegmentTaskError(task *SegmentTask) {
|
||||||
zap.Int64("collectionID", task.CollectionID()),
|
zap.Int64("collectionID", task.CollectionID()),
|
||||||
zap.Int64("replicaID", task.ReplicaID()),
|
zap.Int64("replicaID", task.ReplicaID()),
|
||||||
zap.Int64("segmentID", task.SegmentID()),
|
zap.Int64("segmentID", task.SegmentID()),
|
||||||
zap.Int32("taskStatus", task.Status()),
|
zap.String("status", task.Status()),
|
||||||
zap.Error(task.err),
|
zap.Error(task.err),
|
||||||
)
|
)
|
||||||
meta.GlobalFailedLoadCache.Put(task.collectionID, task.Err())
|
meta.GlobalFailedLoadCache.Put(task.collectionID, task.Err())
|
||||||
|
@ -700,7 +700,7 @@ func (scheduler *taskScheduler) remove(task Task) {
|
||||||
zap.Int64("taskID", task.ID()),
|
zap.Int64("taskID", task.ID()),
|
||||||
zap.Int64("collectionID", task.CollectionID()),
|
zap.Int64("collectionID", task.CollectionID()),
|
||||||
zap.Int64("replicaID", task.ReplicaID()),
|
zap.Int64("replicaID", task.ReplicaID()),
|
||||||
zap.Int32("taskStatus", task.Status()),
|
zap.String("status", task.Status()),
|
||||||
)
|
)
|
||||||
task.Cancel(nil)
|
task.Cancel(nil)
|
||||||
scheduler.tasks.Remove(task.ID())
|
scheduler.tasks.Remove(task.ID())
|
||||||
|
@ -725,7 +725,7 @@ func (scheduler *taskScheduler) remove(task Task) {
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduler.updateTaskMetrics()
|
scheduler.updateTaskMetrics()
|
||||||
log.Debug("task removed", zap.Stack("stack"))
|
log.Info("task removed")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *taskScheduler) checkStale(task Task) error {
|
func (scheduler *taskScheduler) checkStale(task Task) error {
|
||||||
|
|
|
@ -33,16 +33,16 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
Status = int32
|
Status = string
|
||||||
Priority int32
|
Priority int32
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
TaskStatusCreated Status = iota + 1
|
TaskStatusCreated = "created"
|
||||||
TaskStatusStarted
|
TaskStatusStarted = "started"
|
||||||
TaskStatusSucceeded
|
TaskStatusSucceeded = "succeeded"
|
||||||
TaskStatusCanceled
|
TaskStatusCanceled = "canceled"
|
||||||
TaskStatusFailed
|
TaskStatusFailed = "failed"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -107,7 +107,7 @@ type baseTask struct {
|
||||||
loadType querypb.LoadType
|
loadType querypb.LoadType
|
||||||
|
|
||||||
source Source
|
source Source
|
||||||
status *atomic.Int32
|
status *atomic.String
|
||||||
priority Priority
|
priority Priority
|
||||||
err error
|
err error
|
||||||
actions []Action
|
actions []Action
|
||||||
|
@ -128,7 +128,7 @@ func newBaseTask(ctx context.Context, source Source, collectionID, replicaID Uni
|
||||||
replicaID: replicaID,
|
replicaID: replicaID,
|
||||||
shard: shard,
|
shard: shard,
|
||||||
|
|
||||||
status: atomic.NewInt32(TaskStatusStarted),
|
status: atomic.NewString(TaskStatusStarted),
|
||||||
priority: TaskPriorityNormal,
|
priority: TaskPriorityNormal,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
|
|
Loading…
Reference in New Issue