mirror of https://github.com/milvus-io/milvus.git
Standard indexcoord log printing (#8588)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>pull/8630/head
parent
df30315144
commit
9e37881b29
|
@ -115,7 +115,7 @@ func (i *IndexCoord) Init() error {
|
|||
var initErr error = nil
|
||||
Params.InitOnce()
|
||||
i.initOnce.Do(func() {
|
||||
log.Debug("IndexCoord", zap.Any("etcd endpoints", Params.EtcdEndpoints))
|
||||
log.Debug("IndexCoord", zap.Strings("etcd endpoints", Params.EtcdEndpoints))
|
||||
i.UpdateStateCode(internalpb.StateCode_Initializing)
|
||||
|
||||
connectEtcdFn := func() error {
|
||||
|
@ -133,7 +133,7 @@ func (i *IndexCoord) Init() error {
|
|||
log.Debug("IndexCoord try to connect etcd")
|
||||
err := retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(300))
|
||||
if err != nil {
|
||||
log.Debug("IndexCoord try to connect etcd failed", zap.Error(err))
|
||||
log.Error("IndexCoord try to connect etcd failed", zap.Error(err))
|
||||
initErr = err
|
||||
return
|
||||
}
|
||||
|
@ -141,9 +141,9 @@ func (i *IndexCoord) Init() error {
|
|||
i.nodeManager = NewNodeManager()
|
||||
|
||||
sessions, revision, err := i.session.GetSessions(typeutil.IndexNodeRole)
|
||||
log.Debug("IndexCoord", zap.Any("session number", len(sessions)), zap.Any("revision", revision))
|
||||
log.Debug("IndexCoord", zap.Int("session number", len(sessions)), zap.Int64("revision", revision))
|
||||
if err != nil {
|
||||
log.Debug("IndexCoord", zap.Any("Get IndexNode Sessions error", err))
|
||||
log.Error("IndexCoord Get IndexNode Sessions error", zap.Error(err))
|
||||
initErr = err
|
||||
return
|
||||
}
|
||||
|
@ -151,13 +151,13 @@ func (i *IndexCoord) Init() error {
|
|||
session := session
|
||||
go func() {
|
||||
if err := i.nodeManager.AddNode(session.ServerID, session.Address); err != nil {
|
||||
log.Debug("IndexCoord", zap.Any("ServerID", session.ServerID),
|
||||
zap.Any("Add IndexNode error", err))
|
||||
log.Error("IndexCoord", zap.Int64("ServerID", session.ServerID),
|
||||
zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
}
|
||||
log.Debug("IndexCoord", zap.Any("IndexNode number", len(i.nodeManager.nodeClients)))
|
||||
log.Debug("IndexCoord", zap.Int("IndexNode number", len(i.nodeManager.nodeClients)))
|
||||
i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1)
|
||||
nodeTasks := i.metaTable.GetNodeTaskStats()
|
||||
for nodeID, taskNum := range nodeTasks {
|
||||
|
@ -168,20 +168,20 @@ func (i *IndexCoord) Init() error {
|
|||
kvRootPath := Params.KvRootPath
|
||||
etcdKV, err := tsoutil.NewTSOKVBase(Params.EtcdEndpoints, kvRootPath, "index_gid")
|
||||
if err != nil {
|
||||
log.Debug("IndexCoord TSOKVBase initialize failed", zap.Error(err))
|
||||
log.Error("IndexCoord TSOKVBase initialize failed", zap.Error(err))
|
||||
initErr = err
|
||||
return
|
||||
}
|
||||
|
||||
i.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", etcdKV)
|
||||
if err := i.idAllocator.Initialize(); err != nil {
|
||||
log.Debug("IndexCoord idAllocator initialize failed", zap.Error(err))
|
||||
log.Error("IndexCoord idAllocator initialize failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
i.ID, err = i.idAllocator.AllocOne()
|
||||
if err != nil {
|
||||
log.Debug("IndexCoord idAllocator allocOne failed", zap.Error(err))
|
||||
log.Error("IndexCoord idAllocator allocOne failed", zap.Error(err))
|
||||
initErr = err
|
||||
return
|
||||
}
|
||||
|
@ -197,7 +197,7 @@ func (i *IndexCoord) Init() error {
|
|||
|
||||
i.kv, err = miniokv.NewMinIOKV(i.loopCtx, option)
|
||||
if err != nil {
|
||||
log.Debug("IndexCoord new minio kv failed", zap.Error(err))
|
||||
log.Error("IndexCoord new minio kv failed", zap.Error(err))
|
||||
initErr = err
|
||||
return
|
||||
}
|
||||
|
@ -205,7 +205,7 @@ func (i *IndexCoord) Init() error {
|
|||
|
||||
i.sched, err = NewTaskScheduler(i.loopCtx, i.idAllocator, i.kv, i.metaTable)
|
||||
if err != nil {
|
||||
log.Debug("IndexCoord new task scheduler failed", zap.Error(err))
|
||||
log.Error("IndexCoord new task scheduler failed", zap.Error(err))
|
||||
initErr = err
|
||||
return
|
||||
}
|
||||
|
@ -430,7 +430,7 @@ func (i *IndexCoord) GetIndexStates(ctx context.Context, req *indexpb.GetIndexSt
|
|||
}
|
||||
|
||||
func (i *IndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
|
||||
log.Debug("IndexCoord DropIndex", zap.Any("IndexID", req.IndexID))
|
||||
log.Debug("IndexCoord DropIndex", zap.Int64("IndexID", req.IndexID))
|
||||
sp, _ := trace.StartSpanFromContextWithOperationName(ctx, "IndexCoord-BuildIndex")
|
||||
defer sp.Finish()
|
||||
|
||||
|
@ -453,7 +453,7 @@ func (i *IndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexReques
|
|||
}()
|
||||
}()
|
||||
|
||||
log.Debug("IndexCoord DropIndex success", zap.Any("IndexID", req.IndexID))
|
||||
log.Debug("IndexCoord DropIndex success", zap.Int64("IndexID", req.IndexID))
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
|
@ -505,7 +505,7 @@ func (i *IndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReq
|
|||
|
||||
metricType, err := metricsinfo.ParseMetricType(req.Request)
|
||||
if err != nil {
|
||||
log.Warn("IndexCoord.GetMetrics failed to parse metric type",
|
||||
log.Error("IndexCoord.GetMetrics failed to parse metric type",
|
||||
zap.Int64("node_id", i.ID),
|
||||
zap.String("req", req.Request),
|
||||
zap.Error(err))
|
||||
|
@ -568,7 +568,7 @@ func (i *IndexCoord) tsLoop() {
|
|||
select {
|
||||
case <-tsoTicker.C:
|
||||
if err := i.idAllocator.UpdateID(); err != nil {
|
||||
log.Debug("IndexCoord tsLoop UpdateID failed", zap.Error(err))
|
||||
log.Error("IndexCoord tsLoop UpdateID failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
@ -603,8 +603,8 @@ func (i *IndexCoord) recycleUnusedIndexFiles() {
|
|||
log.Debug("IndexCoord recycleUnusedIndexFiles",
|
||||
zap.Int64("Recycle the index files for deleted index with indexBuildID", meta.indexMeta.IndexBuildID))
|
||||
if err := i.kv.RemoveWithPrefix(unusedIndexFilePathPrefix); err != nil {
|
||||
log.Debug("IndexCoord recycleUnusedIndexFiles Remove index files failed",
|
||||
zap.Any("MarkDeleted", true), zap.Error(err))
|
||||
log.Error("IndexCoord recycleUnusedIndexFiles Remove index files failed",
|
||||
zap.Bool("MarkDeleted", true), zap.Error(err))
|
||||
}
|
||||
i.metaTable.DeleteIndex(meta.indexMeta.IndexBuildID)
|
||||
log.Debug("IndexCoord recycleUnusedIndexFiles",
|
||||
|
@ -615,12 +615,12 @@ func (i *IndexCoord) recycleUnusedIndexFiles() {
|
|||
for j := 1; j < int(meta.indexMeta.Version); j++ {
|
||||
unusedIndexFilePathPrefix := strconv.Itoa(int(meta.indexMeta.IndexBuildID)) + "/" + strconv.Itoa(j)
|
||||
if err := i.kv.RemoveWithPrefix(unusedIndexFilePathPrefix); err != nil {
|
||||
log.Debug("IndexCoord recycleUnusedIndexFiles Remove index files failed",
|
||||
zap.Any("MarkDeleted", false), zap.Error(err))
|
||||
log.Error("IndexCoord recycleUnusedIndexFiles Remove index files failed",
|
||||
zap.Bool("MarkDeleted", false), zap.Error(err))
|
||||
}
|
||||
}
|
||||
if err := i.metaTable.UpdateRecycleState(meta.indexMeta.IndexBuildID); err != nil {
|
||||
log.Debug("IndexCoord recycleUnusedIndexFiles UpdateRecycleState failed", zap.Error(err))
|
||||
log.Error("IndexCoord recycleUnusedIndexFiles UpdateRecycleState failed", zap.Error(err))
|
||||
}
|
||||
log.Debug("IndexCoord recycleUnusedIndexFiles",
|
||||
zap.Int64("Recycle the low version index files successfully of the index with indexBuildID", meta.indexMeta.IndexBuildID))
|
||||
|
@ -647,19 +647,19 @@ func (i *IndexCoord) watchNodeLoop() {
|
|||
switch event.EventType {
|
||||
case sessionutil.SessionAddEvent:
|
||||
serverID := event.Session.ServerID
|
||||
log.Debug("IndexCoord watchNodeLoop SessionAddEvent", zap.Any("serverID", serverID),
|
||||
zap.Any("address", event.Session.Address))
|
||||
log.Debug("IndexCoord watchNodeLoop SessionAddEvent", zap.Int64("serverID", serverID),
|
||||
zap.String("address", event.Session.Address))
|
||||
go func() {
|
||||
err := i.nodeManager.AddNode(serverID, event.Session.Address)
|
||||
if err != nil {
|
||||
log.Error("IndexCoord", zap.Any("Add IndexNode err", err))
|
||||
}
|
||||
log.Debug("IndexCoord", zap.Any("IndexNode number", len(i.nodeManager.nodeClients)))
|
||||
log.Debug("IndexCoord", zap.Int("IndexNode number", len(i.nodeManager.nodeClients)))
|
||||
}()
|
||||
i.metricsCacheManager.InvalidateSystemInfoMetrics()
|
||||
case sessionutil.SessionDelEvent:
|
||||
serverID := event.Session.ServerID
|
||||
log.Debug("IndexCoord watchNodeLoop SessionDelEvent", zap.Any("serverID", serverID))
|
||||
log.Debug("IndexCoord watchNodeLoop SessionDelEvent", zap.Int64("serverID", serverID))
|
||||
i.nodeManager.RemoveNode(serverID)
|
||||
i.metricsCacheManager.InvalidateSystemInfoMetrics()
|
||||
}
|
||||
|
@ -689,11 +689,11 @@ func (i *IndexCoord) watchMetaLoop() {
|
|||
err := proto.UnmarshalText(string(event.Kv.Value), indexMeta)
|
||||
indexBuildID := indexMeta.IndexBuildID
|
||||
log.Debug("IndexCoord watchMetaLoop", zap.Any("event.Key", event.Kv.Key),
|
||||
zap.Any("event.V", indexMeta), zap.Any("IndexBuildID", indexBuildID), zap.Error(err))
|
||||
zap.Any("event.V", indexMeta), zap.Int64("IndexBuildID", indexBuildID), zap.Error(err))
|
||||
switch event.Type {
|
||||
case mvccpb.PUT:
|
||||
reload := i.metaTable.LoadMetaFromETCD(indexBuildID, eventRevision)
|
||||
log.Debug("IndexCoord watchMetaLoop PUT", zap.Any("IndexBuildID", indexBuildID), zap.Any("reload", reload))
|
||||
log.Debug("IndexCoord watchMetaLoop PUT", zap.Int64("IndexBuildID", indexBuildID), zap.Bool("reload", reload))
|
||||
if reload {
|
||||
log.Debug("This task has finished", zap.Int64("indexBuildID", indexBuildID),
|
||||
zap.Int64("Finish by IndexNode", indexMeta.NodeID),
|
||||
|
@ -701,7 +701,7 @@ func (i *IndexCoord) watchMetaLoop() {
|
|||
i.nodeManager.pq.IncPriority(indexMeta.NodeID, -1)
|
||||
}
|
||||
case mvccpb.DELETE:
|
||||
log.Debug("IndexCoord watchMetaLoop DELETE", zap.Any("The meta has been deleted of indexBuildID", indexBuildID))
|
||||
log.Debug("IndexCoord watchMetaLoop DELETE", zap.Int64("The meta has been deleted of indexBuildID", indexBuildID))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -714,12 +714,12 @@ func (i *IndexCoord) assignTask(builderClient types.IndexNode, req *indexpb.Crea
|
|||
defer cancel()
|
||||
resp, err := builderClient.CreateIndex(ctx, req)
|
||||
if err != nil {
|
||||
log.Debug("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.Error(err))
|
||||
log.Error("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
if resp.ErrorCode != commonpb.ErrorCode_Success {
|
||||
log.Debug("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.String("Reason", resp.Reason))
|
||||
log.Error("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.String("Reason", resp.Reason))
|
||||
return false
|
||||
}
|
||||
return true
|
||||
|
@ -742,7 +742,7 @@ func (i *IndexCoord) assignTaskLoop() {
|
|||
case <-timeTicker.C:
|
||||
sessions, _, err := i.session.GetSessions(typeutil.IndexNodeRole)
|
||||
if err != nil {
|
||||
log.Debug("IndexCoord assignTaskLoop", zap.Any("GetSessions error", err))
|
||||
log.Error("IndexCoord assignTaskLoop", zap.Any("GetSessions error", err))
|
||||
}
|
||||
if len(sessions) <= 0 {
|
||||
log.Debug("There is no IndexNode available as this time.")
|
||||
|
@ -752,21 +752,22 @@ func (i *IndexCoord) assignTaskLoop() {
|
|||
for _, session := range sessions {
|
||||
serverIDs = append(serverIDs, session.ServerID)
|
||||
}
|
||||
log.Debug("IndexCoord assignTaskLoop", zap.Any("Available IndexNode IDs", serverIDs))
|
||||
log.Debug("IndexCoord assignTaskLoop", zap.Int64s("Available IndexNode IDs", serverIDs))
|
||||
metas := i.metaTable.GetUnassignedTasks(serverIDs)
|
||||
sort.Slice(metas, func(i, j int) bool {
|
||||
return metas[i].indexMeta.Version <= metas[j].indexMeta.Version
|
||||
})
|
||||
log.Debug("IndexCoord assignTaskLoop", zap.Any("Unassigned tasks number", len(metas)), zap.Any("Unassigned tasks meta", metas))
|
||||
log.Debug("IndexCoord assignTaskLoop", zap.Int("Unassigned tasks number", len(metas)))
|
||||
for index, meta := range metas {
|
||||
indexBuildID := meta.indexMeta.IndexBuildID
|
||||
if err = i.metaTable.UpdateVersion(indexBuildID); err != nil {
|
||||
log.Debug("IndexCoord assignmentTasksLoop metaTable.UpdateVersion failed", zap.Error(err))
|
||||
log.Warn("IndexCoord assignmentTasksLoop metaTable.UpdateVersion failed", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
log.Debug("The version of the task has been updated", zap.Int64("indexBuildID", indexBuildID))
|
||||
nodeID, builderClient := i.nodeManager.PeekClient()
|
||||
if builderClient == nil {
|
||||
log.Debug("IndexCoord assignmentTasksLoop can not find available IndexNode")
|
||||
log.Warn("IndexCoord assignmentTasksLoop can not find available IndexNode")
|
||||
break
|
||||
}
|
||||
log.Debug("IndexCoord PeekClient success", zap.Int64("nodeID", nodeID))
|
||||
|
@ -781,11 +782,12 @@ func (i *IndexCoord) assignTaskLoop() {
|
|||
IndexParams: meta.indexMeta.Req.IndexParams,
|
||||
}
|
||||
if !i.assignTask(builderClient, req) {
|
||||
log.Debug("IndexCoord assignTask assign task to IndexNode failed")
|
||||
log.Warn("IndexCoord assignTask assign task to IndexNode failed")
|
||||
continue
|
||||
}
|
||||
if err = i.metaTable.BuildIndex(indexBuildID, nodeID); err != nil {
|
||||
log.Debug("IndexCoord assignmentTasksLoop metaTable.BuildIndex failed", zap.Error(err))
|
||||
log.Error("IndexCoord assignmentTasksLoop metaTable.BuildIndex failed", zap.Error(err))
|
||||
break
|
||||
}
|
||||
log.Debug("This task has been assigned", zap.Int64("indexBuildID", indexBuildID),
|
||||
zap.Int64("The IndexNode execute this task", nodeID))
|
||||
|
|
|
@ -158,7 +158,7 @@ func (mt *metaTable) BuildIndex(indexBuildID UniqueID, nodeID int64) error {
|
|||
|
||||
meta, ok := mt.indexBuildID2Meta[indexBuildID]
|
||||
if !ok {
|
||||
log.Debug("IndexCoord metaTable BuildIndex index not exists", zap.Any("indexBuildID", indexBuildID))
|
||||
log.Error("IndexCoord metaTable BuildIndex index not exists", zap.Any("indexBuildID", indexBuildID))
|
||||
return fmt.Errorf("index not exists with ID = %d", indexBuildID)
|
||||
}
|
||||
|
||||
|
@ -194,7 +194,7 @@ func (mt *metaTable) UpdateVersion(indexBuildID UniqueID) error {
|
|||
log.Debug("IndexCoord metaTable update UpdateVersion", zap.Any("IndexBuildId", indexBuildID))
|
||||
meta, ok := mt.indexBuildID2Meta[indexBuildID]
|
||||
if !ok {
|
||||
log.Debug("IndexCoord metaTable update UpdateVersion indexBuildID not exists", zap.Any("IndexBuildId", indexBuildID))
|
||||
log.Warn("IndexCoord metaTable update UpdateVersion indexBuildID not exists", zap.Any("IndexBuildId", indexBuildID))
|
||||
return fmt.Errorf("index not exists with ID = %d", indexBuildID)
|
||||
}
|
||||
|
||||
|
@ -236,7 +236,7 @@ func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) error {
|
|||
// marshal inside
|
||||
/* #nosec G601 */
|
||||
if err := mt.saveIndexMeta(&meta); err != nil {
|
||||
log.Debug("IndexCoord metaTable MarkIndexAsDeleted saveIndexMeta failed", zap.Error(err))
|
||||
log.Error("IndexCoord metaTable MarkIndexAsDeleted saveIndexMeta failed", zap.Error(err))
|
||||
fn := func() error {
|
||||
m, err := mt.reloadMeta(meta.indexMeta.IndexBuildID)
|
||||
if m == nil {
|
||||
|
@ -338,7 +338,7 @@ func (mt *metaTable) UpdateRecycleState(indexBuildID UniqueID) error {
|
|||
err2 := retry.Do(context.TODO(), fn, retry.Attempts(5))
|
||||
if err2 != nil {
|
||||
meta.indexMeta.Recycled = false
|
||||
log.Debug("IndexCoord metaTable UpdateRecycleState failed", zap.Error(err2))
|
||||
log.Error("IndexCoord metaTable UpdateRecycleState failed", zap.Error(err2))
|
||||
return err2
|
||||
}
|
||||
}
|
||||
|
@ -471,13 +471,13 @@ func (mt *metaTable) LoadMetaFromETCD(indexBuildID int64, revision int64) bool {
|
|||
return false
|
||||
}
|
||||
} else {
|
||||
log.Debug("Index not exist", zap.Int64("IndexBuildID", indexBuildID))
|
||||
log.Error("Index not exist", zap.Int64("IndexBuildID", indexBuildID))
|
||||
return false
|
||||
}
|
||||
|
||||
m, err := mt.reloadMeta(indexBuildID)
|
||||
if m == nil {
|
||||
log.Debug("IndexCoord metaTable reloadMeta failed", zap.Error(err))
|
||||
log.Error("IndexCoord metaTable reloadMeta failed", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
@ -68,7 +68,7 @@ func (nm *NodeManager) RemoveNode(nodeID UniqueID) {
|
|||
func (nm *NodeManager) AddNode(nodeID UniqueID, address string) error {
|
||||
log.Debug("IndexCoord addNode", zap.Any("nodeID", nodeID), zap.Any("node address", address))
|
||||
if nm.pq.CheckExist(nodeID) {
|
||||
log.Debug("IndexCoord", zap.Any("Node client already exist with ID:", nodeID))
|
||||
log.Warn("IndexCoord", zap.Any("Node client already exist with ID:", nodeID))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue