mirror of https://github.com/milvus-io/milvus.git
Add warning log to trace old messages (#13939)
Signed-off-by: sunby <bingyi.sun@zilliz.com> Co-authored-by: sunby <bingyi.sun@zilliz.com>pull/13990/head
parent
d39a4a3f2d
commit
6b91e5e9d1
|
@ -66,6 +66,8 @@ type insertBufferNode struct {
|
|||
timeTickStream msgstream.MsgStream
|
||||
ttLogger timeTickLogger
|
||||
ttMerger *mergedTimeTickerSender
|
||||
|
||||
lastTimestamp Timestamp
|
||||
}
|
||||
|
||||
type timeTickLogger struct {
|
||||
|
@ -190,6 +192,17 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
|
|||
endPositions = append(endPositions, pos)
|
||||
}
|
||||
|
||||
if startPositions[0].Timestamp < ibNode.lastTimestamp {
|
||||
log.Error("insert buffer node consumed old messages",
|
||||
zap.String("channel", ibNode.channelName),
|
||||
zap.Any("timestamp", startPositions[0].Timestamp),
|
||||
zap.Any("lastTimestamp", ibNode.lastTimestamp),
|
||||
)
|
||||
return []Msg{}
|
||||
}
|
||||
|
||||
ibNode.lastTimestamp = endPositions[0].Timestamp
|
||||
|
||||
// Updating segment statistics in replica
|
||||
seg2Upload, err := ibNode.updateSegStatesInReplica(fgMsg.insertMessages, startPositions[0], endPositions[0])
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue