mirror of https://github.com/milvus-io/milvus.git
Refine usage of TimeRecorder.Record (#23142)
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>pull/23136/head
parent
a182594bf5
commit
1287ca699a
|
@ -168,7 +168,7 @@ func (c *ChannelStore) Reload() error {
|
|||
log.Info("channel store reload channel",
|
||||
zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name))
|
||||
}
|
||||
record.Record("ChannelStore reload")
|
||||
log.Info("channel store reload done", zap.Duration("duration", record.ElapseSpan()))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -150,8 +150,7 @@ func (m *meta) reloadFromKV() error {
|
|||
for _, segIdx := range segmentIndexes {
|
||||
m.updateSegmentIndex(segIdx)
|
||||
}
|
||||
|
||||
record.Record("meta reloadFromKV")
|
||||
log.Info("DataCoord meta reloadFromKV done", zap.Duration("duration", record.ElapseSpan()))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -262,7 +262,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
buildIndexLatency := it.tr.Record("build index done")
|
||||
buildIndexLatency := it.tr.RecordSpan()
|
||||
metrics.IndexNodeKnowhereBuildIndexLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(buildIndexLatency.Milliseconds()))
|
||||
|
||||
indexBlobs, err := it.index.Serialize()
|
||||
|
@ -270,7 +270,9 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
|
|||
log.Ctx(ctx).Error("IndexNode index Serialize failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
it.tr.Record("index serialize done")
|
||||
|
||||
log.Ctx(ctx).Info("index serialize done", zap.Int64("buildID", it.BuildID),
|
||||
zap.Duration("duration", it.tr.RecordSpan()))
|
||||
|
||||
// use serialized size before encoding
|
||||
it.serializedSize = 0
|
||||
|
@ -301,7 +303,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
|
|||
log.Warn("failed to serialize index", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
encodeIndexFileDur := it.tr.Record("index codec serialize done")
|
||||
encodeIndexFileDur := it.tr.RecordSpan()
|
||||
metrics.IndexNodeEncodeIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(encodeIndexFileDur.Milliseconds()))
|
||||
it.indexBlobs = serializedIndexBlobs
|
||||
log.Ctx(ctx).Info("Successfully build index", zap.Int64("buildID", it.BuildID),
|
||||
|
@ -379,7 +381,7 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
buildIndexLatency := it.tr.Record("build index done")
|
||||
buildIndexLatency := it.tr.RecordSpan()
|
||||
metrics.IndexNodeKnowhereBuildIndexLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(buildIndexLatency.Milliseconds()))
|
||||
|
||||
fileInfos, err := it.index.GetIndexFileInfo()
|
||||
|
@ -387,7 +389,9 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error {
|
|||
log.Ctx(ctx).Error("IndexNode index Serialize failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
it.tr.Record("index serialize done")
|
||||
|
||||
log.Ctx(ctx).Info("index serialize done", zap.Int64("buildID", it.BuildID),
|
||||
zap.Duration("duration", it.tr.RecordSpan()))
|
||||
|
||||
// use serialized size before encoding
|
||||
it.serializedSize = 0
|
||||
|
@ -404,7 +408,7 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error {
|
|||
log.Ctx(it.ctx).Error("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err))
|
||||
}
|
||||
|
||||
encodeIndexFileDur := it.tr.Record("index codec serialize done")
|
||||
encodeIndexFileDur := it.tr.RecordSpan()
|
||||
metrics.IndexNodeEncodeIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(encodeIndexFileDur.Milliseconds()))
|
||||
return nil
|
||||
}
|
||||
|
@ -445,7 +449,7 @@ func (it *indexBuildTask) SaveIndexFiles(ctx context.Context) error {
|
|||
it.statistic.EndTime = time.Now().UnixMicro()
|
||||
it.node.storeIndexFilesAndStatistic(it.ClusterID, it.BuildID, saveFileKeys, it.serializedSize, &it.statistic)
|
||||
log.Ctx(ctx).Info("save index files done", zap.Strings("IndexFiles", savePaths))
|
||||
saveIndexFileDur := it.tr.Record("index file save done")
|
||||
saveIndexFileDur := it.tr.RecordSpan()
|
||||
metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(saveIndexFileDur.Milliseconds()))
|
||||
it.tr.Elapse("index building all done")
|
||||
log.Ctx(ctx).Info("Successfully save index files", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID),
|
||||
|
@ -505,7 +509,7 @@ func (it *indexBuildTask) SaveDiskAnnIndexFiles(ctx context.Context) error {
|
|||
it.statistic.EndTime = time.Now().UnixMicro()
|
||||
it.node.storeIndexFilesAndStatistic(it.ClusterID, it.BuildID, saveFileKeys, it.serializedSize, &it.statistic)
|
||||
log.Ctx(ctx).Info("save index files done", zap.Strings("IndexFiles", savePaths))
|
||||
saveIndexFileDur := it.tr.Record("index file save done")
|
||||
saveIndexFileDur := it.tr.RecordSpan()
|
||||
metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(saveIndexFileDur.Milliseconds()))
|
||||
it.tr.Elapse("index building all done")
|
||||
log.Ctx(ctx).Info("IndexNode CreateIndex successfully ", zap.Int64("collect", it.collectionID),
|
||||
|
@ -529,14 +533,15 @@ func (it *indexBuildTask) decodeBlobs(ctx context.Context, blobs []*storage.Blob
|
|||
it.partitionID = partitionID
|
||||
it.segmentID = segmentID
|
||||
|
||||
log.Ctx(ctx).Info("indexnode deserialize data success",
|
||||
deserializeDur := it.tr.RecordSpan()
|
||||
|
||||
log.Ctx(ctx).Info("IndexNode deserialize data success",
|
||||
zap.Int64("index id", it.req.IndexID),
|
||||
zap.String("index name", it.req.IndexName),
|
||||
zap.Int64("collectionID", it.collectionID),
|
||||
zap.Int64("partitionID", it.partitionID),
|
||||
zap.Int64("segmentID", it.segmentID))
|
||||
|
||||
it.tr.Record("deserialize vector data done")
|
||||
zap.Int64("segmentID", it.segmentID),
|
||||
zap.Duration("deserialize duration", deserializeDur))
|
||||
|
||||
// we can ensure that there blobs are in one Field
|
||||
var data storage.FieldData
|
||||
|
|
|
@ -236,13 +236,6 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) {
|
|||
}
|
||||
dt.deleteMsg.HashValues = typeutil.HashPK2Channels(dt.result.IDs, channelNames)
|
||||
|
||||
log.Debug("send delete request to virtual channels",
|
||||
zap.String("collection", dt.deleteMsg.GetCollectionName()),
|
||||
zap.Int64("collection_id", collID),
|
||||
zap.Strings("virtual_channels", channelNames),
|
||||
zap.Int64("task_id", dt.ID()))
|
||||
|
||||
tr.Record("get vchannels")
|
||||
// repack delete msg by dmChannel
|
||||
result := make(map[uint32]msgstream.TsMsg)
|
||||
collectionName := dt.deleteMsg.CollectionName
|
||||
|
@ -301,14 +294,20 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) {
|
|||
}
|
||||
}
|
||||
|
||||
tr.Record("pack messages")
|
||||
log.Debug("send delete request to virtual channels",
|
||||
zap.String("collection", dt.deleteMsg.GetCollectionName()),
|
||||
zap.Int64("collection_id", collID),
|
||||
zap.Strings("virtual_channels", channelNames),
|
||||
zap.Int64("task_id", dt.ID()),
|
||||
zap.Duration("prepare duration", tr.RecordSpan()))
|
||||
|
||||
err = stream.Produce(msgPack)
|
||||
if err != nil {
|
||||
dt.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
||||
dt.result.Status.Reason = err.Error()
|
||||
return err
|
||||
}
|
||||
sendMsgDur := tr.Record("send delete request to dml channels")
|
||||
sendMsgDur := tr.ElapseSpan()
|
||||
metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(sendMsgDur.Milliseconds()))
|
||||
|
||||
return nil
|
||||
|
|
|
@ -174,7 +174,6 @@ func (it *insertTask) Execute(ctx context.Context) error {
|
|||
defer sp.End()
|
||||
|
||||
tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute insert %d", it.ID()))
|
||||
defer tr.Elapse("insert execute done")
|
||||
|
||||
collectionName := it.insertMsg.CollectionName
|
||||
collID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
|
||||
|
@ -195,13 +194,12 @@ func (it *insertTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
it.insertMsg.PartitionID = partitionID
|
||||
tr.Record("get collection id & partition id from cache")
|
||||
|
||||
getCacheDur := tr.RecordSpan()
|
||||
stream, err := it.chMgr.getOrCreateDmlStream(collID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tr.Record("get used message stream")
|
||||
getMsgStreamDur := tr.RecordSpan()
|
||||
|
||||
channelNames, err := it.chMgr.getVChannels(collID)
|
||||
if err != nil {
|
||||
|
@ -219,7 +217,9 @@ func (it *insertTask) Execute(ctx context.Context) error {
|
|||
zap.Int64("collection_id", collID),
|
||||
zap.Int64("partition_id", partitionID),
|
||||
zap.Strings("virtual_channels", channelNames),
|
||||
zap.Int64("task_id", it.ID()))
|
||||
zap.Int64("task_id", it.ID()),
|
||||
zap.Duration("get cache duration", getCacheDur),
|
||||
zap.Duration("get msgStream duration", getMsgStreamDur))
|
||||
|
||||
// assign segmentID for insert data and repack data by segmentID
|
||||
var msgPack *msgstream.MsgPack
|
||||
|
@ -232,21 +232,25 @@ func (it *insertTask) Execute(ctx context.Context) error {
|
|||
it.result.Status.Reason = err.Error()
|
||||
return err
|
||||
}
|
||||
assignSegmentIDDur := tr.RecordSpan()
|
||||
|
||||
log.Debug("assign segmentID for insert data success",
|
||||
zap.Int64("collectionID", collID),
|
||||
zap.String("collectionName", it.insertMsg.CollectionName))
|
||||
tr.Record("assign segment id")
|
||||
zap.String("collectionName", it.insertMsg.CollectionName),
|
||||
zap.Duration("assign segmentID duration", assignSegmentIDDur))
|
||||
err = stream.Produce(msgPack)
|
||||
if err != nil {
|
||||
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
||||
it.result.Status.Reason = err.Error()
|
||||
return err
|
||||
}
|
||||
sendMsgDur := tr.Record("send insert request to dml channel")
|
||||
sendMsgDur := tr.RecordSpan()
|
||||
metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(sendMsgDur.Milliseconds()))
|
||||
|
||||
totalExecDur := tr.ElapseSpan()
|
||||
log.Debug("Proxy Insert Execute done",
|
||||
zap.String("collectionName", collectionName))
|
||||
zap.String("collectionName", collectionName),
|
||||
zap.Duration("send message duration", sendMsgDur),
|
||||
zap.Duration("execute duration", totalExecDur))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -336,14 +336,13 @@ func (it *upsertTask) insertExecute(ctx context.Context, msgPack *msgstream.MsgP
|
|||
}
|
||||
}
|
||||
it.upsertMsg.InsertMsg.PartitionID = partitionID
|
||||
tr.Record("get collection id & partition id from cache when insertExecute")
|
||||
getCacheDur := tr.RecordSpan()
|
||||
|
||||
_, err = it.chMgr.getOrCreateDmlStream(collID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tr.Record("get used message stream when insertExecute")
|
||||
|
||||
getMsgStreamDur := tr.RecordSpan()
|
||||
channelNames, err := it.chMgr.getVChannels(collID)
|
||||
if err != nil {
|
||||
log.Error("get vChannels failed when insertExecute",
|
||||
|
@ -359,7 +358,9 @@ func (it *upsertTask) insertExecute(ctx context.Context, msgPack *msgstream.MsgP
|
|||
zap.Int64("collection_id", collID),
|
||||
zap.Int64("partition_id", partitionID),
|
||||
zap.Strings("virtual_channels", channelNames),
|
||||
zap.Int64("task_id", it.ID()))
|
||||
zap.Int64("task_id", it.ID()),
|
||||
zap.Duration("get cache duration", getCacheDur),
|
||||
zap.Duration("get msgStream duration", getMsgStreamDur))
|
||||
|
||||
// assign segmentID for insert data and repack data by segmentID
|
||||
insertMsgPack, err := assignSegmentID(it.TraceCtx(), it.upsertMsg.InsertMsg, it.result, channelNames, it.idAllocator, it.segIDAssigner)
|
||||
|
@ -370,9 +371,10 @@ func (it *upsertTask) insertExecute(ctx context.Context, msgPack *msgstream.MsgP
|
|||
it.result.Status.Reason = err.Error()
|
||||
return err
|
||||
}
|
||||
assignSegmentIDDur := tr.RecordSpan()
|
||||
log.Debug("assign segmentID for insert data success when insertExecute",
|
||||
zap.String("collectionName", it.req.CollectionName))
|
||||
tr.Record("assign segment id")
|
||||
zap.String("collectionName", it.req.CollectionName),
|
||||
zap.Duration("assign segmentID duration", assignSegmentIDDur))
|
||||
msgPack.Msgs = append(msgPack.Msgs, insertMsgPack.Msgs...)
|
||||
|
||||
log.Debug("Proxy Insert Execute done when upsert",
|
||||
|
@ -383,8 +385,6 @@ func (it *upsertTask) insertExecute(ctx context.Context, msgPack *msgstream.MsgP
|
|||
|
||||
func (it *upsertTask) deleteExecute(ctx context.Context, msgPack *msgstream.MsgPack) (err error) {
|
||||
tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy deleteExecute upsert %d", it.ID()))
|
||||
defer tr.Elapse("delete execute done when upsert")
|
||||
|
||||
collID := it.upsertMsg.DeleteMsg.CollectionID
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.Int64("collectionID", collID))
|
||||
|
@ -399,11 +399,6 @@ func (it *upsertTask) deleteExecute(ctx context.Context, msgPack *msgstream.MsgP
|
|||
it.upsertMsg.DeleteMsg.PrimaryKeys = it.result.IDs
|
||||
it.upsertMsg.DeleteMsg.HashValues = typeutil.HashPK2Channels(it.upsertMsg.DeleteMsg.PrimaryKeys, channelNames)
|
||||
|
||||
log.Debug("send delete request to virtual channels when deleteExecute",
|
||||
zap.Int64("collection_id", collID),
|
||||
zap.Strings("virtual_channels", channelNames))
|
||||
|
||||
tr.Record("get vchannels")
|
||||
// repack delete msg by dmChannel
|
||||
result := make(map[uint32]msgstream.TsMsg)
|
||||
collectionName := it.upsertMsg.DeleteMsg.CollectionName
|
||||
|
@ -463,7 +458,9 @@ func (it *upsertTask) deleteExecute(ctx context.Context, msgPack *msgstream.MsgP
|
|||
}
|
||||
msgPack.Msgs = append(msgPack.Msgs, deleteMsgPack.Msgs...)
|
||||
|
||||
log.Debug("Proxy Upsert deleteExecute done")
|
||||
log.Debug("Proxy Upsert deleteExecute done", zap.Int64("collection_id", collID),
|
||||
zap.Strings("virtual_channels", channelNames), zap.Int64("taskID", it.ID()),
|
||||
zap.Duration("prepare duration", tr.ElapseSpan()))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -493,16 +490,18 @@ func (it *upsertTask) Execute(ctx context.Context) (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
tr.Record("pack messages in upsert")
|
||||
tr.RecordSpan()
|
||||
err = stream.Produce(msgPack)
|
||||
if err != nil {
|
||||
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
||||
it.result.Status.Reason = err.Error()
|
||||
return err
|
||||
}
|
||||
sendMsgDur := tr.Record("send upsert request to dml channels")
|
||||
sendMsgDur := tr.RecordSpan()
|
||||
metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(sendMsgDur.Milliseconds()))
|
||||
log.Debug("Proxy Upsert Execute done")
|
||||
totalDur := tr.ElapseSpan()
|
||||
log.Debug("Proxy Upsert Execute done", zap.Int64("taskID", it.ID()),
|
||||
zap.Duration("total duration", totalDur))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -320,8 +320,7 @@ func (s *Server) initMeta() error {
|
|||
LeaderViewManager: meta.NewLeaderViewManager(),
|
||||
}
|
||||
s.targetMgr = meta.NewTargetManager(s.broker, s.meta)
|
||||
|
||||
record.Record("Server initMeta")
|
||||
log.Info("QueryCoord server initMeta done", zap.Duration("duration", record.ElapseSpan()))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -75,9 +75,9 @@ func (b *baseReadTask) SetStep(step TaskStep) {
|
|||
switch step {
|
||||
case TaskStepEnqueue:
|
||||
b.queueDur = 0
|
||||
b.tr.Record("enqueue done")
|
||||
b.tr.RecordSpan()
|
||||
case TaskStepPreExecute:
|
||||
b.queueDur = b.tr.Record("start to process")
|
||||
b.queueDur = b.tr.RecordSpan()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -108,9 +108,9 @@ func (b *baseReadTask) PostExecute(ctx context.Context) error {
|
|||
func (b *baseReadTask) Notify(err error) {
|
||||
switch b.step {
|
||||
case TaskStepEnqueue:
|
||||
b.queueDur = b.tr.Record("enqueueEnd")
|
||||
b.queueDur = b.tr.RecordSpan()
|
||||
case TaskStepPostExecute:
|
||||
b.tr.Record("execute task done")
|
||||
b.tr.RecordSpan()
|
||||
}
|
||||
b.baseTask.Notify(err)
|
||||
}
|
||||
|
|
|
@ -148,8 +148,7 @@ func (mt *MetaTable) reload() error {
|
|||
|
||||
metrics.RootCoordNumOfCollections.Set(float64(collectionNum))
|
||||
metrics.RootCoordNumOfPartitions.WithLabelValues().Set(float64(partitionNum))
|
||||
|
||||
record.Record("MetaTable reload")
|
||||
log.Info("RootCoord meta table reload done", zap.Duration("duration", record.ElapseSpan()))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue