Add more logs for load sealed segment (#14079)

Add logs before&after flowgraph_ddnode forwarding delete msg

See also: #13879

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/14148/head
XuanYang-cn 2021-12-24 10:50:07 +08:00 committed by GitHub
parent 9a8f53fb5f
commit 3d24007efa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 19 additions and 8 deletions

View File

@ -147,8 +147,10 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
zap.String("vChannelName", ddn.vchannelName))
fgMsg.insertMessages = append(fgMsg.insertMessages, imsg)
case commonpb.MsgType_Delete:
log.Debug("DDNode receive delete messages")
dmsg := msg.(*msgstream.DeleteMsg)
log.Debug("DDNode receive delete messages",
zap.Int("num", len(dmsg.GetPrimaryKeys())),
zap.String("vChannelName", ddn.vchannelName))
for i := 0; i < len(dmsg.PrimaryKeys); i++ {
dmsg.HashValues = append(dmsg.HashValues, uint32(0))
}
@ -165,7 +167,9 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
err := ddn.forwardDeleteMsg(forwardMsgs, msMsg.TimestampMin(), msMsg.TimestampMax())
if err != nil {
// TODO: proper deal with error
log.Warn("DDNode forward delete msg failed", zap.Error(err))
log.Warn("DDNode forward delete msg failed",
zap.String("vChannelName", ddn.vchannelName),
zap.Error(err))
}
fgMsg.startPositions = append(fgMsg.startPositions, msMsg.StartPositions()...)

View File

@ -89,7 +89,7 @@ func (dn *deleteNode) Close() {
}
func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) error {
log.Debug("bufferDeleteMsg", zap.Any("primary keys", msg.PrimaryKeys))
log.Debug("bufferDeleteMsg", zap.Any("primary keys", msg.PrimaryKeys), zap.String("vChannelName", dn.channelName))
segIDToPkMap := make(map[UniqueID][]int64)
segIDToTsMap := make(map[UniqueID][]uint64)

View File

@ -472,11 +472,12 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
deleteTimestamps: make(map[UniqueID][]Timestamp),
deleteOffset: make(map[UniqueID]int64),
}
log.Debug("start read msg from stream reader")
log.Debug("start read msg from stream reader", zap.Any("msg id", position.GetMsgID()))
for stream.HasNext(pChannelName) {
ctx, cancel := context.WithTimeout(ctx, timeoutForEachRead)
tsMsg, err := stream.Next(ctx, pChannelName)
if err != nil {
log.Warn("fail to load delete", zap.String("pChannelName", pChannelName), zap.Any("msg id", position.GetMsgID()), zap.Error(err))
cancel()
return err
}
@ -491,12 +492,16 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
cancel()
continue
}
log.Debug("delete pk", zap.Any("pk", dmsg.PrimaryKeys))
log.Debug("delete pk",
zap.Any("pk", dmsg.PrimaryKeys),
zap.String("vChannelName", position.GetChannelName()),
zap.Any("msg id", position.GetMsgID()),
)
processDeleteMessages(loader.historicalReplica, dmsg, delData)
}
cancel()
}
log.Debug("All data has been read, there is no more data", zap.String("channel", pChannelName))
log.Debug("All data has been read, there is no more data", zap.String("channel", pChannelName), zap.Any("msg id", position.GetMsgID()))
for segmentID, pks := range delData.deleteIDs {
segment, err := loader.historicalReplica.getSegmentByID(segmentID)
if err != nil {
@ -514,7 +519,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
}
wg.Wait()
stream.Close()
log.Debug("from dml check point load done")
log.Debug("from dml check point load done", zap.Any("msg id", position.GetMsgID()))
return nil
}

View File

@ -619,7 +619,9 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
zap.Any("toSubChannels", toSubChannels))
for _, info := range w.req.Infos {
w.node.loader.FromDmlCPLoadDelete(w.ctx, collectionID, info.SeekPosition)
if err := w.node.loader.FromDmlCPLoadDelete(w.ctx, collectionID, info.SeekPosition); err != nil {
return errors.New("watchDeltaChannelsTask failed, error = " + err.Error())
}
}
// start flow graphs