diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 05d88bf538..a5a1dd9d59 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -59,8 +59,8 @@ var ( // TODO: sunby put to config enableTtChecker = true ttCheckerName = "dataTtChecker" - ttMaxInterval = 3 * time.Minute - ttCheckerWarnMsg = fmt.Sprintf("we haven't received tt for %f minutes", ttMaxInterval.Minutes()) + ttMaxInterval = 2 * time.Minute + ttCheckerWarnMsg = fmt.Sprintf("Datacoord haven't received tt for %f minutes", ttMaxInterval.Minutes()) segmentTimedFlushDuration = 10.0 ) diff --git a/internal/rootcoord/proxy_manager.go b/internal/rootcoord/proxy_manager.go index 6c5177ac15..d9c2fe1a78 100644 --- a/internal/rootcoord/proxy_manager.go +++ b/internal/rootcoord/proxy_manager.go @@ -34,13 +34,13 @@ import ( // proxyManager manages proxy connected to the rootcoord type proxyManager struct { - ctx context.Context - cancel context.CancelFunc - lock sync.Mutex - etcdCli *clientv3.Client - getSessions []func([]*sessionutil.Session) - addSessions []func(*sessionutil.Session) - delSessions []func(*sessionutil.Session) + ctx context.Context + cancel context.CancelFunc + lock sync.Mutex + etcdCli *clientv3.Client + initSessionsFunc []func([]*sessionutil.Session) + addSessionsFunc []func(*sessionutil.Session) + delSessionsFunc []func(*sessionutil.Session) } // newProxyManager helper function to create a proxyManager @@ -54,22 +54,22 @@ func newProxyManager(ctx context.Context, client *clientv3.Client, fns ...func([ lock: sync.Mutex{}, etcdCli: client, } - p.getSessions = append(p.getSessions, fns...) + p.initSessionsFunc = append(p.initSessionsFunc, fns...) return p } -// AddSession adds functions to addSessions function list -func (p *proxyManager) AddSession(fns ...func(*sessionutil.Session)) { +// AddSessionFunc adds functions to addSessions function list +func (p *proxyManager) AddSessionFunc(fns ...func(*sessionutil.Session)) { p.lock.Lock() defer p.lock.Unlock() - p.addSessions = append(p.addSessions, fns...) + p.addSessionsFunc = append(p.addSessionsFunc, fns...) } -// DelSession add functions to delSessions function list -func (p *proxyManager) DelSession(fns ...func(*sessionutil.Session)) { +// DelSessionFunc add functions to delSessions function list +func (p *proxyManager) DelSessionFunc(fns ...func(*sessionutil.Session)) { p.lock.Lock() defer p.lock.Unlock() - p.delSessions = append(p.delSessions, fns...) + p.delSessionsFunc = append(p.delSessionsFunc, fns...) } // WatchProxy starts a goroutine to watch proxy session changes on etcd @@ -81,8 +81,8 @@ func (p *proxyManager) WatchProxy() error { if err != nil { return err } - log.Debug("succeed to get sessions on etcd", zap.Any("sessions", sessions), zap.Int64("revision", rev)) - for _, f := range p.getSessions { + log.Debug("succeed to init sessions on etcd", zap.Any("sessions", sessions), zap.Int64("revision", rev)) + for _, f := range p.initSessionsFunc { f(sessions) } @@ -136,7 +136,7 @@ func (p *proxyManager) handlePutEvent(e *clientv3.Event) error { return err } log.Debug("received proxy put event with session", zap.Any("session", session)) - for _, f := range p.addSessions { + for _, f := range p.addSessionsFunc { f(session) } metrics.RootCoordProxyLister.WithLabelValues(metricProxy(session.ServerID)).Set(1) @@ -149,7 +149,7 @@ func (p *proxyManager) handleDeleteEvent(e *clientv3.Event) error { return err } log.Debug("received proxy delete event with session", zap.Any("session", session)) - for _, f := range p.delSessions { + for _, f := range p.delSessionsFunc { f(session) } metrics.RootCoordProxyLister.WithLabelValues(metricProxy(session.ServerID)).Set(0) diff --git a/internal/rootcoord/proxy_manager_test.go b/internal/rootcoord/proxy_manager_test.go index e38062b214..f7a580ed5f 100644 --- a/internal/rootcoord/proxy_manager_test.go +++ b/internal/rootcoord/proxy_manager_test.go @@ -76,8 +76,8 @@ func TestProxyManager(t *testing.T) { assert.Equal(t, int64(100), sess.ServerID) t.Log("del session", sess) } - pm.AddSession(fa) - pm.DelSession(fd) + pm.AddSessionFunc(fa) + pm.DelSessionFunc(fd) err = pm.WatchProxy() assert.Nil(t, err) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index e682938ddd..05e1cb1e7f 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -22,6 +22,7 @@ import ( "fmt" "math/rand" "strconv" + "strings" "sync" "sync/atomic" "syscall" @@ -430,21 +431,6 @@ func (c *Core) getSegments(ctx context.Context, collID typeutil.UniqueID) (map[t return segID2PartID, nil } -func (c *Core) setDdMsgSendFlag(b bool) error { - flag, err := c.MetaTable.txn.Load(DDMsgSendPrefix) - if err != nil { - return err - } - - if (b && flag == "true") || (!b && flag == "false") { - log.Debug("DdMsg send flag need not change", zap.String("flag", flag)) - return nil - } - - err = c.MetaTable.txn.Save(DDMsgSendPrefix, strconv.FormatBool(b)) - return err -} - func (c *Core) setMsgStreams() error { if Params.PulsarCfg.Address == "" { return fmt.Errorf("pulsar address is empty") @@ -1043,11 +1029,11 @@ func (c *Core) Init() error { c.proxyManager = newProxyManager( c.ctx, c.etcdCli, - c.chanTimeTick.clearSessions, + c.chanTimeTick.initSessions, c.proxyClientManager.GetProxyClients, ) - c.proxyManager.AddSession(c.chanTimeTick.addSession, c.proxyClientManager.AddProxyClient) - c.proxyManager.DelSession(c.chanTimeTick.delSession, c.proxyClientManager.DelProxyClient) + c.proxyManager.AddSessionFunc(c.chanTimeTick.addSession, c.proxyClientManager.AddProxyClient) + c.proxyManager.DelSessionFunc(c.chanTimeTick.delSession, c.proxyClientManager.DelProxyClient) c.metricsCacheManager = metricsinfo.NewMetricsCacheManager() @@ -1066,8 +1052,21 @@ func (c *Core) Init() error { func (c *Core) reSendDdMsg(ctx context.Context, force bool) error { if !force { flag, err := c.MetaTable.txn.Load(DDMsgSendPrefix) - if err != nil || flag == "true" { - log.Debug("No un-successful DdMsg") + if err != nil { + // TODO, this is super ugly hack but our kv interface does not support loadWithExist + // leave it for later + if strings.Contains(err.Error(), "there is no value on key") { + log.Debug("skip reSendDdMsg with no dd-msg-send key") + return nil + } + return err + } + value, err := strconv.ParseBool(flag) + if err != nil { + return err + } + if value { + log.Debug("skip reSendDdMsg with dd-msg-send set to true") return nil } } @@ -1167,7 +1166,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error { } // Update DDOperation in etcd - return c.setDdMsgSendFlag(true) + return c.MetaTable.txn.Save(DDMsgSendPrefix, strconv.FormatBool(true)) } // Start start rootcoord diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index 50405c6b6c..fdf1ccb060 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -19,6 +19,7 @@ package rootcoord import ( "context" "fmt" + "strconv" "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/common" @@ -254,7 +255,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { } // Update DDOperation in etcd - return t.core.setDdMsgSendFlag(true) + return t.core.MetaTable.txn.Save(DDMsgSendPrefix, strconv.FormatBool(true)) } // DropCollectionReqTask drop collection request task @@ -370,7 +371,7 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { t.core.ExpireMetaCache(ctx, aliases, ts) // Update DDOperation in etcd - return t.core.setDdMsgSendFlag(true) + return t.core.MetaTable.txn.Save(DDMsgSendPrefix, strconv.FormatBool(true)) } // HasCollectionReqTask has collection request task @@ -564,7 +565,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { t.core.ExpireMetaCache(ctx, []string{t.Req.CollectionName}, ts) // Update DDOperation in etcd - return t.core.setDdMsgSendFlag(true) + return t.core.MetaTable.txn.Save(DDMsgSendPrefix, strconv.FormatBool(true)) } // DropPartitionReqTask drop partition request task @@ -658,7 +659,7 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error { //} // Update DDOperation in etcd - return t.core.setDdMsgSendFlag(true) + return t.core.MetaTable.txn.Save(DDMsgSendPrefix, strconv.FormatBool(true)) } // HasPartitionReqTask has partition request task diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index 8d37574e1d..f5a489ca73 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -149,7 +149,7 @@ func (t *timetickSync) sendToChannel() { if len(idleSessionList) > 0 { // give warning every 2 second if not get ttMsg from source sessions if maxCnt%10 == 0 { - log.Warn("session idle for long time", zap.Any("idle session list", idleSessionList), + log.Warn("session idle for long time", zap.Any("idle list", idleSessionList), zap.Any("idle time", Params.ProxyCfg.TimeTickInterval.Milliseconds()*maxCnt)) } return @@ -273,11 +273,12 @@ func (t *timetickSync) delSession(sess *sessionutil.Session) { } } -func (t *timetickSync) clearSessions(sess []*sessionutil.Session) { +func (t *timetickSync) initSessions(sess []*sessionutil.Session) { t.lock.Lock() defer t.lock.Unlock() for _, s := range sess { t.sess2ChanTsMap[s.ServerID] = nil + log.Debug("Init proxy sessions for timeticksync", zap.Int64("serverID", s.ServerID)) } } @@ -302,17 +303,14 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) { log.Debug("timetickSync sendChan closed") return } - + if enableTtChecker { + checker.Check() + } // reduce each channel to get min timestamp local := sessTimetick[t.sourceID] if len(local.chanTsMap) == 0 { continue } - - if enableTtChecker { - checker.Check() - } - hdr := fmt.Sprintf("send ts to %d channels", len(local.chanTsMap)) tr := timerecord.NewTimeRecorder(hdr) wg := sync.WaitGroup{}