mirror of https://github.com/milvus-io/milvus.git
Add default timestamp when send statistics of pchan (#5854)
Signed-off-by: dragondriver <jiquan.long@zilliz.com>pull/5869/head^2
parent
e70d359600
commit
2875b10dc5
|
@ -281,6 +281,12 @@ func (node *ProxyNode) sendChannelsTimeTickLoop() {
|
|||
case <-node.ctx.Done():
|
||||
return
|
||||
case <-timer.C:
|
||||
ts, err := node.tsoAllocator.AllocOne()
|
||||
if err != nil {
|
||||
log.Warn("Failed to get timestamp from tso", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
stats, err := node.chTicker.getMinTsStatistics()
|
||||
if err != nil {
|
||||
log.Warn("sendChannelsTimeTickLoop.getMinTsStatistics", zap.Error(err))
|
||||
|
@ -290,10 +296,15 @@ func (node *ProxyNode) sendChannelsTimeTickLoop() {
|
|||
channels := make([]pChan, 0, len(stats))
|
||||
tss := make([]Timestamp, 0, len(stats))
|
||||
|
||||
maxTs := ts
|
||||
for channel, ts := range stats {
|
||||
channels = append(channels, channel)
|
||||
tss = append(tss, ts)
|
||||
if ts > maxTs {
|
||||
maxTs = ts
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug("send timestamp statistics of pchan", zap.Any("channels", channels), zap.Any("tss", tss))
|
||||
|
||||
req := &internalpb.ChannelTimeTickMsg{
|
||||
|
@ -303,8 +314,9 @@ func (node *ProxyNode) sendChannelsTimeTickLoop() {
|
|||
Timestamp: 0, // todo
|
||||
SourceID: node.session.ServerID,
|
||||
},
|
||||
ChannelNames: channels,
|
||||
Timestamps: tss,
|
||||
ChannelNames: channels,
|
||||
Timestamps: tss,
|
||||
DefaultTimestamp: maxTs,
|
||||
}
|
||||
|
||||
status, err := node.masterService.UpdateChannelTimeTick(node.ctx, req)
|
||||
|
|
Loading…
Reference in New Issue