From af66a0b621d14f6e4b4a5ebb2e9a9816af912762 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Mon, 14 Nov 2022 14:41:11 +0800 Subject: [PATCH] Fix Time Tick session2ChanTs map use the same nodeID (#20537) Signed-off-by: xiaofan-luan Signed-off-by: xiaofan-luan --- cmd/components/root_coord.go | 2 +- internal/querynode/query_node.go | 1 - internal/rootcoord/proxy_manager.go | 26 --------- internal/rootcoord/root_coord.go | 8 ++- internal/rootcoord/root_coord_test.go | 4 ++ internal/rootcoord/timeticksync.go | 23 +++----- internal/rootcoord/timeticksync_test.go | 66 +++++++++++++++++++++-- internal/util/sessionutil/session_util.go | 2 + 8 files changed, 83 insertions(+), 49 deletions(-) diff --git a/cmd/components/root_coord.go b/cmd/components/root_coord.go index 71fe16fed0..f12705969e 100644 --- a/cmd/components/root_coord.go +++ b/cmd/components/root_coord.go @@ -58,7 +58,7 @@ func (rc *RootCoord) Run() error { log.Error("RootCoord starts error", zap.Error(err)) return err } - log.Debug("RootCoord successfully started") + log.Info("RootCoord successfully started") return nil } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index ee472cd097..d49367ee76 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -150,7 +150,6 @@ func (node *QueryNode) initSession() error { } node.session.Init(typeutil.QueryNodeRole, node.address, false, true) paramtable.SetNodeID(node.session.ServerID) - log.Info("QueryNode init session", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("node address", node.session.Address)) return nil } diff --git a/internal/rootcoord/proxy_manager.go b/internal/rootcoord/proxy_manager.go index 9863d868ff..0cd92aaeab 100644 --- a/internal/rootcoord/proxy_manager.go +++ b/internal/rootcoord/proxy_manager.go @@ -207,29 +207,3 @@ func (p *proxyManager) getSessionsOnEtcd(ctx context.Context) ([]*sessionutil.Se func (p *proxyManager) Stop() { p.cancel() } - -// listProxyInEtcd helper function lists proxy in etcd -func listProxyInEtcd(ctx context.Context, cli *clientv3.Client) (map[int64]*sessionutil.Session, error) { - ctx2, cancel := context.WithTimeout(ctx, rootcoord.RequestTimeout) - defer cancel() - resp, err := cli.Get( - ctx2, - path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot, typeutil.ProxyRole), - clientv3.WithPrefix(), - clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), - ) - if err != nil { - return nil, fmt.Errorf("list proxy failed, etcd error = %w", err) - } - sess := make(map[int64]*sessionutil.Session) - for _, v := range resp.Kvs { - var s sessionutil.Session - err := json.Unmarshal(v.Value, &s) - if err != nil { - log.Debug("unmarshal SvrSession failed", zap.Error(err)) - continue - } - sess[s.ServerID] = &s - } - return sess, nil -} diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 3721586b4b..f4303c77d9 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -187,7 +187,7 @@ func (c *Core) sendTimeTick(t Timestamp, reason string) error { Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_TimeTick), commonpbutil.WithTimeStamp(t), - commonpbutil.WithSourceID(c.session.ServerID), + commonpbutil.WithSourceID(ddlSourceID), ), ChannelNames: pc, Timestamps: pt, @@ -197,6 +197,11 @@ func (c *Core) sendTimeTick(t Timestamp, reason string) error { } func (c *Core) sendMinDdlTsAsTt() { + code := c.stateCode.Load().(commonpb.StateCode) + if code != commonpb.StateCode_Healthy { + log.Warn("rootCoord is not healthy, skip send timetick") + return + } minBgDdlTs := c.ddlTsLockManager.GetMinDdlTs() minNormalDdlTs := c.scheduler.GetMinDdlTs() minDdlTs := funcutil.Min(minBgDdlTs, minNormalDdlTs) @@ -452,7 +457,6 @@ func (c *Core) initInternal() error { chanMap := c.meta.ListCollectionPhysicalChannels() c.chanTimeTick = newTimeTickSync(c.ctx, c.session.ServerID, c.factory, chanMap) - c.chanTimeTick.addSession(c.session) c.proxyClientManager = newProxyClientManager(c.proxyCreator) c.broker = newServerBroker(c) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index dc48b810a5..871e452005 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -1276,6 +1276,9 @@ func TestCore_sendMinDdlTsAsTt(t *testing.T) { withTtSynchronizer(ticker), withDdlTsLockManager(ddlManager), withScheduler(sched)) + + c.stateCode.Store(commonpb.StateCode_Healthy) + c.session.ServerID = TestRootCoordID c.sendMinDdlTsAsTt() // no session. ticker.addSession(&sessionutil.Session{ServerID: TestRootCoordID}) c.sendMinDdlTsAsTt() @@ -1311,6 +1314,7 @@ func TestCore_startTimeTickLoop(t *testing.T) { c.ctx = ctx Params.ProxyCfg.TimeTickInterval = time.Millisecond c.wg.Add(1) + c.UpdateStateCode(commonpb.StateCode_Initializing) go c.startTimeTickLoop() time.Sleep(time.Millisecond * 4) diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index a596ed6079..6d5c4a7101 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -41,6 +41,7 @@ var ( timeTickSyncTtInterval = 2 * time.Minute ttCheckerName = "rootTtChecker" ttCheckerWarnMsg = fmt.Sprintf("RootCoord haven't synchronized the time tick for %f minutes", timeTickSyncTtInterval.Minutes()) + ddlSourceID = UniqueID(-1) ) type ttHistogram struct { @@ -133,9 +134,9 @@ func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Fact // sendToChannel send all channels' timetick to sendChan // lock is needed by the invoker -func (t *timetickSync) sendToChannel() { +func (t *timetickSync) sendToChannel() bool { if len(t.sess2ChanTsMap) == 0 { - return + return false } // detect whether rootcoord receives ttMsg from all source sessions @@ -157,7 +158,7 @@ func (t *timetickSync) sendToChannel() { log.Warn("session idle for long time", zap.Any("idle list", idleSessionList), zap.Any("idle time", Params.ProxyCfg.TimeTickInterval.Milliseconds()*maxCnt)) } - return + return false } // clear sess2ChanTsMap and send a clone @@ -167,6 +168,7 @@ func (t *timetickSync) sendToChannel() { t.sess2ChanTsMap[k] = nil } t.sendChan <- ptt + return true } // UpdateTimeTick check msg validation and send it to local channel @@ -185,16 +187,6 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason return fmt.Errorf("skip ChannelTimeTickMsg from un-recognized session %d", in.Base.SourceID) } - if in.Base.SourceID == t.sourceID { - if prev != nil && in.DefaultTimestamp <= prev.defaultTs { - log.Warn("timestamp go back", zap.Int64("source id", in.Base.SourceID), - zap.Uint64("curr ts", in.DefaultTimestamp), - zap.Uint64("prev ts", prev.defaultTs), - zap.String("reason", reason)) - return nil - } - } - if prev == nil { t.sess2ChanTsMap[in.Base.SourceID] = newChanTsMsg(in, 1) } else { @@ -225,7 +217,8 @@ func (t *timetickSync) initSessions(sess []*sessionutil.Session) { t.lock.Lock() defer t.lock.Unlock() t.sess2ChanTsMap = make(map[typeutil.UniqueID]*chanTsMsg) - t.sess2ChanTsMap[t.sourceID] = nil + // Init DDL source + t.sess2ChanTsMap[ddlSourceID] = nil for _, s := range sess { t.sess2ChanTsMap[s.ServerID] = nil log.Info("Init proxy sessions for timeticksync", zap.Int64("serverID", s.ServerID)) @@ -257,7 +250,7 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) { checker.Check() } // reduce each channel to get min timestamp - local := sessTimetick[t.sourceID] + local := sessTimetick[ddlSourceID] if len(local.chanTsMap) == 0 { continue } diff --git a/internal/rootcoord/timeticksync_test.go b/internal/rootcoord/timeticksync_test.go index f7fbb1f7db..b86132df7f 100644 --- a/internal/rootcoord/timeticksync_test.go +++ b/internal/rootcoord/timeticksync_test.go @@ -28,12 +28,12 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/sessionutil" ) func TestTimetickSync(t *testing.T) { ctx := context.Background() sourceID := int64(100) - factory := dependency.NewDefaultFactory(true) //chanMap := map[typeutil.UniqueID][]string{ @@ -49,10 +49,12 @@ func TestTimetickSync(t *testing.T) { wg.Add(1) t.Run("sendToChannel", func(t *testing.T) { defer wg.Done() - ttSync.sendToChannel() + result := ttSync.sendToChannel() + assert.False(t, result) ttSync.sess2ChanTsMap[1] = nil - ttSync.sendToChannel() + result = ttSync.sendToChannel() + assert.False(t, result) msg := &internalpb.ChannelTimeTickMsg{ Base: &commonpb.MsgBase{ @@ -60,7 +62,8 @@ func TestTimetickSync(t *testing.T) { }, } ttSync.sess2ChanTsMap[1] = newChanTsMsg(msg, 1) - ttSync.sendToChannel() + result = ttSync.sendToChannel() + assert.True(t, result) }) wg.Add(1) @@ -107,6 +110,61 @@ func TestTimetickSync(t *testing.T) { wg.Wait() } +func TestMultiTimetickSync(t *testing.T) { + ctx := context.Background() + + factory := dependency.NewDefaultFactory(true) + + //chanMap := map[typeutil.UniqueID][]string{ + // int64(1): {"rootcoord-dml_0"}, + //} + + Params.RootCoordCfg.DmlChannelNum = 1 + Params.CommonCfg.RootCoordDml = "rootcoord-dml" + Params.CommonCfg.RootCoordDelta = "rootcoord-delta" + ttSync := newTimeTickSync(ctx, UniqueID(0), factory, nil) + + var wg sync.WaitGroup + + wg.Add(1) + t.Run("UpdateTimeTick", func(t *testing.T) { + defer wg.Done() + + // suppose this is rooit + ttSync.addSession(&sessionutil.Session{ServerID: 1}) + + // suppose this is proxy1 + ttSync.addSession(&sessionutil.Session{ServerID: 2}) + + msg := &internalpb.ChannelTimeTickMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_TimeTick, + SourceID: int64(1), + }, + DefaultTimestamp: 100, + } + + err := ttSync.updateTimeTick(msg, "1") + assert.Nil(t, err) + + msg2 := &internalpb.ChannelTimeTickMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_TimeTick, + SourceID: int64(2), + }, + DefaultTimestamp: 102, + } + err = ttSync.updateTimeTick(msg2, "2") + assert.Nil(t, err) + + // make sure result works + result := <-ttSync.sendChan + assert.True(t, len(result) == 2) + }) + + wg.Wait() +} + func Test_ttHistogram_get(t *testing.T) { h := newTtHistogram() assert.Equal(t, typeutil.ZeroTimestamp, h.get("not_exist")) diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index f3ed2fcfbc..d521b740bc 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -229,6 +229,7 @@ func (s *Session) Init(serverName, address string, exclusive bool, triggerKill b panic(err) } s.ServerID = serverID + log.Info("start server", zap.String("name", serverName), zap.String("address", address), zap.Int64("id", s.ServerID)) } // String makes Session struct able to be logged by zap @@ -252,6 +253,7 @@ func (s *Session) getServerID() (int64, error) { serverIDMu.Lock() defer serverIDMu.Unlock() + // Notice, For standalone, all process share the same nodeID. nodeID := paramtable.GetNodeID() if nodeID != 0 { return nodeID, nil