Add timeticker logger for insert buffer node (#11216)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/11263/head
congqixia 2021-11-04 15:40:14 +08:00 committed by GitHub
parent b63a9521c2
commit 0ef95c5df1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 0 deletions

View File

@ -28,12 +28,14 @@ import (
"github.com/golang/protobuf/proto"
"github.com/opentracing/opentracing-go"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -65,6 +67,31 @@ type insertBufferNode struct {
timeTickStream msgstream.MsgStream
segmentStatisticsStream msgstream.MsgStream
ttLogger timeTickLogger
}
type timeTickLogger struct {
start atomic.Uint64
counter atomic.Int32
}
func (l *timeTickLogger) LogTs(ts Timestamp) {
if l.counter.Load() == 0 {
l.start.Store(ts)
}
l.counter.Inc()
if l.counter.Load() == 1000 {
min := l.start.Load()
l.start.Store(ts)
l.counter.Store(0)
go l.printLogs(min, ts)
}
}
func (l *timeTickLogger) printLogs(start, end Timestamp) {
t1, _ := tsoutil.ParseTS(start)
t2, _ := tsoutil.ParseTS(end)
log.Debug("IBN timetick log", zap.Time("from", t1), zap.Time("to", t2), zap.Duration("elapsed", t2.Sub(t1)), zap.Uint64("start", start), zap.Uint64("end", end))
}
type segmentCheckPoint struct {
@ -639,6 +666,7 @@ func readBinary(reader io.Reader, receiver interface{}, dataType schemapb.DataTy
// writeHardTimeTick writes timetick once insertBufferNode operates.
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
ibNode.ttLogger.LogTs(ts)
msgPack := msgstream.MsgPack{}
timeTickMsg := msgstream.DataNodeTtMsg{
BaseMsg: msgstream.BaseMsg{

View File

@ -195,6 +195,9 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
iBNode, err := newInsertBufferNode(ctx, flushChan, fm, newCache(), c)
require.NoError(t, err)
// trigger log ts
iBNode.ttLogger.counter.Store(999)
flushChan <- flushMsg{
msgID: 1,
timestamp: 2000,