mirror of https://github.com/milvus-io/milvus.git
Signed-off-by: longjiquan <jiquan.long@zilliz.com>pull/23284/head
parent
d9b4303056
commit
ac479318d0
|
@ -125,7 +125,9 @@ func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Fact
|
|||
|
||||
lock: sync.Mutex{},
|
||||
sess2ChanTsMap: make(map[typeutil.UniqueID]*chanTsMsg),
|
||||
sendChan: make(chan map[typeutil.UniqueID]*chanTsMsg, 16),
|
||||
|
||||
// 1 is the most reasonable capacity. In fact, Milvus can only focus on the latest time tick.
|
||||
sendChan: make(chan map[typeutil.UniqueID]*chanTsMsg, 1),
|
||||
|
||||
syncedTtHistogram: newTtHistogram(),
|
||||
}
|
||||
|
@ -166,7 +168,16 @@ func (t *timetickSync) sendToChannel() {
|
|||
ptt[k] = v
|
||||
t.sess2ChanTsMap[k] = nil
|
||||
}
|
||||
t.sendChan <- ptt
|
||||
|
||||
select {
|
||||
case t.sendChan <- ptt:
|
||||
default:
|
||||
// The consumer of `sendChan` haven't completed its operation. If we send the `ptt` here, the consumer will
|
||||
// always get an older time tick. The older time tick in `sendChan` will block newer time tick in next window.
|
||||
// However, in fact the consumer can only focus on the newest.
|
||||
|
||||
// TODO: maybe a metric should be here.
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateTimeTick check msg validation and send it to local channel
|
||||
|
|
Loading…
Reference in New Issue