Remove Timetick channel (#16449)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/16521/head
Xiaofan 2022-04-17 23:47:36 +08:00 committed by GitHub
parent 35e7ed45b3
commit fce8f6cfab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 42 additions and 101 deletions

View File

@ -471,39 +471,7 @@ func (c *Core) setMsgStreams() error {
return fmt.Errorf("RootCoordSubName is empty")
}
// rootcoord time tick channel
if Params.CommonCfg.RootCoordTimeTick == "" {
return fmt.Errorf("timeTickChannel is empty")
}
timeTickStream, _ := c.factory.NewMsgStream(c.ctx)
metrics.RootCoordNumOfMsgStream.Inc()
timeTickStream.AsProducer([]string{Params.CommonCfg.RootCoordTimeTick})
log.Debug("RootCoord register timetick producer success", zap.String("channel name", Params.CommonCfg.RootCoordTimeTick))
c.SendTimeTick = func(t typeutil.Timestamp, reason string) error {
msgPack := ms.MsgPack{}
baseMsg := ms.BaseMsg{
BeginTimestamp: t,
EndTimestamp: t,
HashValues: []uint32{0},
}
timeTickResult := internalpb.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
MsgID: 0,
Timestamp: t,
SourceID: c.session.ServerID,
},
}
timeTickMsg := &ms.TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
if err := timeTickStream.Broadcast(&msgPack); err != nil {
return err
}
pc := c.chanTimeTick.listDmlChannels()
pt := make([]uint64, len(pc))
for i := 0; i < len(pt); i++ {
@ -520,10 +488,6 @@ func (c *Core) setMsgStreams() error {
Timestamps: pt,
DefaultTimestamp: t,
}
//log.Debug("update timetick",
// zap.Any("DefaultTs", t),
// zap.Any("sourceID", c.session.ServerID),
// zap.Any("reason", reason))
return c.chanTimeTick.updateTimeTick(&ttMsg, reason)
}
@ -1291,7 +1255,6 @@ func (c *Core) Start() error {
}
log.Debug(typeutil.RootCoordRole, zap.Int64("node id", c.session.ServerID))
log.Debug(typeutil.RootCoordRole, zap.String("time tick channel name", Params.CommonCfg.RootCoordTimeTick))
c.startOnce.Do(func() {
if err := c.proxyManager.WatchProxy(); err != nil {

View File

@ -706,11 +706,6 @@ func TestRootCoord_Base(t *testing.T) {
tmpFactory := dependency.NewDefaultFactory(true)
timeTickStream, _ := tmpFactory.NewMsgStream(ctx)
timeTickStream.AsConsumer([]string{Params.CommonCfg.RootCoordTimeTick}, Params.CommonCfg.RootCoordSubName)
timeTickStream.Start()
defer timeTickStream.Close()
dmlStream, _ := tmpFactory.NewMsgStream(ctx)
defer dmlStream.Close()
@ -738,27 +733,7 @@ func TestRootCoord_Base(t *testing.T) {
time.Sleep(100 * time.Millisecond)
shardsNum := int32(8)
fmt.Printf("hello world2")
var wg sync.WaitGroup
wg.Add(1)
t.Run("time tick", func(t *testing.T) {
defer wg.Done()
ttmsg, ok := <-timeTickStream.Chan()
assert.True(t, ok)
assert.Equal(t, 1, len(ttmsg.Msgs))
ttm, ok := (ttmsg.Msgs[0]).(*msgstream.TimeTickMsg)
assert.True(t, ok)
assert.Greater(t, ttm.Base.Timestamp, uint64(0))
t.Log(ttm.Base.Timestamp)
ttmsg2, ok := <-timeTickStream.Chan()
assert.True(t, ok)
assert.Equal(t, 1, len(ttmsg2.Msgs))
ttm2, ok := (ttmsg2.Msgs[0]).(*msgstream.TimeTickMsg)
assert.True(t, ok)
assert.Greater(t, ttm2.Base.Timestamp, uint64(0))
assert.Equal(t, ttm2.Base.Timestamp, ttm.Base.Timestamp+1)
})
wg.Add(1)
t.Run("create collection", func(t *testing.T) {
@ -2077,14 +2052,9 @@ func TestRootCoord_Base(t *testing.T) {
defer wg.Done()
const (
proxyIDInvalid = 102
proxyName0 = "proxy_0"
proxyName1 = "proxy_1"
chanName0 = "c0"
chanName1 = "c1"
chanName2 = "c2"
ts0 = uint64(100)
ts1 = uint64(120)
ts2 = uint64(150)
ts0 = uint64(20)
ts1 = uint64(40)
ts2 = uint64(60)
)
numChan := core.chanTimeTick.getDmlChannelNum()
p1 := sessionutil.Session{
@ -2118,12 +2088,41 @@ func TestRootCoord_Base(t *testing.T) {
dn2 := core.chanTimeTick.getDeltaChannelName()
core.chanTimeTick.addDeltaChannels(dn0, dn1, dn2)
// wait for local channel reported
for {
core.chanTimeTick.lock.Lock()
_, ok := core.chanTimeTick.sess2ChanTsMap[core.session.ServerID].chanTsMap[cn0]
if !ok {
core.chanTimeTick.lock.Unlock()
time.Sleep(100 * time.Millisecond)
continue
}
_, ok = core.chanTimeTick.sess2ChanTsMap[core.session.ServerID].chanTsMap[cn1]
if !ok {
core.chanTimeTick.lock.Unlock()
time.Sleep(100 * time.Millisecond)
continue
}
_, ok = core.chanTimeTick.sess2ChanTsMap[core.session.ServerID].chanTsMap[cn2]
if !ok {
core.chanTimeTick.lock.Unlock()
time.Sleep(100 * time.Millisecond)
continue
}
core.chanTimeTick.lock.Unlock()
break
}
msg0 := &internalpb.ChannelTimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
SourceID: 100,
},
ChannelNames: []string{chanName0, chanName1},
ChannelNames: []string{cn0, cn1},
Timestamps: []uint64{ts0, ts2},
}
s, _ := core.UpdateChannelTimeTick(ctx, msg0)
@ -2136,7 +2135,7 @@ func TestRootCoord_Base(t *testing.T) {
MsgType: commonpb.MsgType_TimeTick,
SourceID: 101,
},
ChannelNames: []string{chanName1, chanName2},
ChannelNames: []string{cn1, cn2},
Timestamps: []uint64{ts1, ts2},
}
s, _ = core.UpdateChannelTimeTick(ctx, msg1)
@ -2666,24 +2665,9 @@ func TestRootCoord2(t *testing.T) {
err = core.Register()
assert.NoError(t, err)
timeTickStream, _ := msFactory.NewMsgStream(ctx)
timeTickStream.AsConsumer([]string{Params.CommonCfg.RootCoordTimeTick}, Params.CommonCfg.RootCoordSubName)
timeTickStream.Start()
time.Sleep(100 * time.Millisecond)
var wg sync.WaitGroup
wg.Add(1)
t.Run("time tick", func(t *testing.T) {
defer wg.Done()
ttmsg, ok := <-timeTickStream.Chan()
assert.True(t, ok)
assert.Equal(t, 1, len(ttmsg.Msgs))
ttm, ok := (ttmsg.Msgs[0]).(*msgstream.TimeTickMsg)
assert.True(t, ok)
assert.Greater(t, ttm.Base.Timestamp, typeutil.Timestamp(0))
})
wg.Add(1)
t.Run("create collection", func(t *testing.T) {
defer wg.Done()
@ -2960,10 +2944,6 @@ func TestCheckFlushedSegments(t *testing.T) {
err = core.Register()
assert.NoError(t, err)
timeTickStream, _ := msFactory.NewMsgStream(ctx)
timeTickStream.AsConsumer([]string{Params.CommonCfg.RootCoordTimeTick}, Params.CommonCfg.RootCoordSubName)
timeTickStream.Start()
time.Sleep(100 * time.Millisecond)
var wg sync.WaitGroup
@ -3120,10 +3100,6 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) {
err = core.Register()
assert.NoError(t, err)
timeTickStream, _ := msFactory.NewMsgStream(ctx)
timeTickStream.AsConsumer([]string{Params.CommonCfg.RootCoordTimeTick}, Params.CommonCfg.RootCoordSubName)
timeTickStream.Start()
time.Sleep(100 * time.Millisecond)
modifyFunc := func(collInfo *etcdpb.CollectionInfo) {

View File

@ -219,7 +219,7 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason
return nil
}
if len(in.Timestamps) != len(in.ChannelNames) {
return fmt.Errorf("invalid TimeTickMsg")
return fmt.Errorf("invalid TimeTickMsg, timestamp and channelname size mismatch")
}
prev, ok := t.sess2ChanTsMap[in.Base.SourceID]
@ -230,7 +230,7 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason
// if ddl operation not finished, skip current ts update
ddlMinTs := t.getDdlMinTimeTick()
if in.DefaultTimestamp > ddlMinTs {
log.Debug("ddl not finished", zap.Int64("source id", in.Base.SourceID),
log.Info("ddl not finished", zap.Int64("source id", in.Base.SourceID),
zap.Uint64("curr ts", in.DefaultTimestamp),
zap.Uint64("ddlMinTs", ddlMinTs),
zap.String("reason", reason))
@ -239,7 +239,7 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason
if in.Base.SourceID == t.sourceID {
if prev != nil && in.DefaultTimestamp <= prev.defaultTs {
log.Debug("timestamp go back", zap.Int64("source id", in.Base.SourceID),
log.Warn("timestamp go back", zap.Int64("source id", in.Base.SourceID),
zap.Uint64("curr ts", in.DefaultTimestamp),
zap.Uint64("prev ts", prev.defaultTs),
zap.String("reason", reason))
@ -252,7 +252,6 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason
} else {
t.sess2ChanTsMap[in.Base.SourceID] = newChanTsMsg(in, prev.cnt+1)
}
t.sendToChannel()
return nil
}
@ -326,7 +325,7 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) {
}
}
if err := t.sendTimeTickToChannel([]string{chanName}, mints); err != nil {
log.Debug("SendTimeTickToChannel fail", zap.Error(err))
log.Warn("SendTimeTickToChannel fail", zap.Error(err))
}
wg.Done()
}(chanName, ts)
@ -364,7 +363,6 @@ func (t *timetickSync) sendTimeTickToChannel(chanNames []string, ts typeutil.Tim
TimeTickMsg: timeTickResult,
}
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
if err := t.dmlChannels.broadcast(chanNames, &msgPack); err != nil {
return err
}

View File

@ -193,6 +193,7 @@ func (p *commonConfig) initProxySubName() {
}
// --- rootcoord ---
// Deprecate
func (p *commonConfig) initRootCoordTimeTick() {
keys := []string{
"common.chanNamePrefix.rootCoordTimeTick",
@ -242,6 +243,7 @@ func (p *commonConfig) initQueryCoordSearch() {
p.QueryCoordSearch = p.initChanNamePrefix(keys)
}
// Deprecated, search result use grpc instead of a result channel
func (p *commonConfig) initQueryCoordSearchResult() {
keys := []string{
"common.chanNamePrefix.searchResult",
@ -250,6 +252,7 @@ func (p *commonConfig) initQueryCoordSearchResult() {
p.QueryCoordSearchResult = p.initChanNamePrefix(keys)
}
// Deprecate
func (p *commonConfig) initQueryCoordTimeTick() {
keys := []string{
"common.chanNamePrefix.queryTimeTick",
@ -284,6 +287,7 @@ func (p *commonConfig) initDataCoordStatistic() {
p.DataCoordStatistic = p.initChanNamePrefix(keys)
}
// Deprecate
func (p *commonConfig) initDataCoordTimeTick() {
keys := []string{
"common.chanNamePrefix.dataCoordTimeTick",