mirror of https://github.com/milvus-io/milvus.git
Add a checker to listen for the timetick. (#7061)
issue: #7060 Signed-off-by: sunby <bingyi.sun@zilliz.com>pull/7083/head
parent
5e78fd6a97
commit
5c383d1d02
|
@ -11,6 +11,7 @@ package datacoord
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -41,6 +42,14 @@ const (
|
|||
connEtcdRetryInterval = 200 * time.Millisecond
|
||||
)
|
||||
|
||||
var (
|
||||
// TODO: sunby put to config
|
||||
enableTtChecker = true
|
||||
ttCheckerName = "dataTtChecker"
|
||||
ttMaxInterval = 3 * time.Minute
|
||||
ttCheckerWarnMsg = fmt.Sprintf("we haven't received tt for %f minutes", ttMaxInterval.Minutes())
|
||||
)
|
||||
|
||||
type (
|
||||
UniqueID = typeutil.UniqueID
|
||||
Timestamp = typeutil.Timestamp
|
||||
|
@ -312,6 +321,12 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
|
|||
zap.String("subscriptionName", Params.DataCoordSubscriptionName))
|
||||
ttMsgStream.Start()
|
||||
defer ttMsgStream.Close()
|
||||
|
||||
var checker *LongTermChecker
|
||||
if enableTtChecker {
|
||||
checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
|
||||
checker.Start()
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -331,6 +346,9 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
|
|||
continue
|
||||
}
|
||||
ttMsg := msg.(*msgstream.DataNodeTtMsg)
|
||||
if enableTtChecker {
|
||||
checker.Check()
|
||||
}
|
||||
|
||||
ch := ttMsg.ChannelName
|
||||
ts := ttMsg.Timestamp
|
||||
|
|
|
@ -11,8 +11,12 @@
|
|||
package datacoord
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
)
|
||||
|
||||
|
@ -44,3 +48,43 @@ func VerifyResponse(response interface{}, err error) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// LongTermChecker checks we receive at least one msg in d duration. If not, checker
|
||||
// will print a warn message.
|
||||
type LongTermChecker struct {
|
||||
d time.Duration
|
||||
t *time.Ticker
|
||||
ctx context.Context
|
||||
warn string
|
||||
name string
|
||||
}
|
||||
|
||||
func NewLongTermChecker(ctx context.Context, name string, d time.Duration, warn string) *LongTermChecker {
|
||||
c := &LongTermChecker{
|
||||
name: name,
|
||||
ctx: ctx,
|
||||
d: d,
|
||||
warn: warn,
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *LongTermChecker) Start() {
|
||||
c.t = time.NewTicker(c.d)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
log.Warn(fmt.Sprintf("long term checker [%s] shutdown", c.name))
|
||||
return
|
||||
case <-c.t.C:
|
||||
log.Warn(c.warn)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Check reset the time ticker
|
||||
func (c *LongTermChecker) Check() {
|
||||
c.t.Reset(c.d)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue