mirror of https://github.com/milvus-io/milvus.git
enhance: add the tick log for the tt msgstream seek method (#34397)
/kind improvement Signed-off-by: SimFG <bang.fu@zilliz.com>pull/34346/head
parent
8a2be8a457
commit
c6e2900cfc
|
@ -888,6 +888,9 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*MsgPosition,
|
|||
ms.consumerLock.Lock()
|
||||
defer ms.consumerLock.Unlock()
|
||||
|
||||
loopTick := time.NewTicker(5 * time.Second)
|
||||
defer loopTick.Stop()
|
||||
|
||||
for idx := range msgPositions {
|
||||
mp = msgPositions[idx]
|
||||
if len(mp.MsgID) == 0 {
|
||||
|
@ -903,16 +906,21 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*MsgPosition,
|
|||
|
||||
// skip all data before current tt
|
||||
runLoop := true
|
||||
loopMsgCnt := 0
|
||||
loopStarTime := time.Now()
|
||||
for runLoop {
|
||||
select {
|
||||
case <-ms.ctx.Done():
|
||||
return ms.ctx.Err()
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-loopTick.C:
|
||||
log.Info("seek loop tick", zap.Int("loopMsgCnt", loopMsgCnt), zap.String("channel", mp.ChannelName))
|
||||
case msg, ok := <-consumer.Chan():
|
||||
if !ok {
|
||||
return fmt.Errorf("consumer closed")
|
||||
}
|
||||
loopMsgCnt++
|
||||
consumer.Ack(msg)
|
||||
|
||||
headerMsg := commonpb.MsgHeader{}
|
||||
|
@ -926,6 +934,12 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*MsgPosition,
|
|||
}
|
||||
if tsMsg.Type() == commonpb.MsgType_TimeTick && tsMsg.BeginTs() >= mp.Timestamp {
|
||||
runLoop = false
|
||||
if time.Since(loopStarTime) > 30*time.Second {
|
||||
log.Info("seek loop finished long time",
|
||||
zap.Int("loopMsgCnt", loopMsgCnt),
|
||||
zap.String("channel", mp.ChannelName),
|
||||
zap.Duration("cost", time.Since(loopStarTime)))
|
||||
}
|
||||
} else if tsMsg.BeginTs() > mp.Timestamp {
|
||||
ctx, _ := ExtractCtx(tsMsg, msg.Properties())
|
||||
tsMsg.SetTraceCtx(ctx)
|
||||
|
|
Loading…
Reference in New Issue