Add time tick logic to Proxy

Signed-off-by: dragondriver <jiquan.long@zilliz.com>
pull/4973/head^2
dragondriver 2020-11-26 17:47:46 +08:00 committed by yefu.chen
parent 32445dd02b
commit 72a7af3f4a
4 changed files with 17 additions and 4 deletions

View File

@ -11,7 +11,7 @@
nodeID: # will be deprecated after v0.2 nodeID: # will be deprecated after v0.2
proxyIDList: [1] proxyIDList: [0]
queryNodeIDList: [2] queryNodeIDList: [2]
writeNodeIDList: [3] writeNodeIDList: [3]

View File

@ -86,14 +86,14 @@ func TestParamTable_ProxyIDList(t *testing.T) {
Params.Init() Params.Init()
ids := Params.ProxyIDList ids := Params.ProxyIDList
assert.Equal(t, len(ids), 1) assert.Equal(t, len(ids), 1)
assert.Equal(t, ids[0], int64(1)) assert.Equal(t, ids[0], int64(0))
} }
func TestParamTable_ProxyTimeTickChannelNames(t *testing.T) { func TestParamTable_ProxyTimeTickChannelNames(t *testing.T) {
Params.Init() Params.Init()
names := Params.ProxyTimeTickChannelNames names := Params.ProxyTimeTickChannelNames
assert.Equal(t, len(names), 1) assert.Equal(t, len(names), 1)
assert.Equal(t, names[0], "proxyTimeTick-1") assert.Equal(t, names[0], "proxyTimeTick-0")
} }
func TestParamTable_MsgChannelSubName(t *testing.T) { func TestParamTable_MsgChannelSubName(t *testing.T) {

View File

@ -94,6 +94,8 @@ func CreateProxy(ctx context.Context) (*Proxy, error) {
return nil, err return nil, err
} }
p.tick = newTimeTick(p.proxyLoopCtx, p.tsoAllocator, time.Millisecond*200, p.sched.TaskDoneTest)
return p, nil return p, nil
} }
@ -114,6 +116,7 @@ func (p *Proxy) startProxy() error {
p.idAllocator.Start() p.idAllocator.Start()
p.tsoAllocator.Start() p.tsoAllocator.Start()
p.segAssigner.Start() p.segAssigner.Start()
p.tick.Start()
// Start callbacks // Start callbacks
for _, cb := range p.startCallbacks { for _, cb := range p.startCallbacks {
@ -184,6 +187,8 @@ func (p *Proxy) stopProxyLoop() {
p.queryMsgStream.Close() p.queryMsgStream.Close()
p.tick.Close()
p.proxyLoopWg.Wait() p.proxyLoopWg.Wait()
} }

View File

@ -70,6 +70,9 @@ func (tt *timeTick) tick() error {
} }
msgPack := msgstream.MsgPack{} msgPack := msgstream.MsgPack{}
timeTickMsg := &msgstream.TimeTickMsg{ timeTickMsg := &msgstream.TimeTickMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []int32{int32(Params.ProxyID())},
},
TimeTickMsg: internalpb.TimeTickMsg{ TimeTickMsg: internalpb.TimeTickMsg{
MsgType: internalpb.MsgType_kTimeTick, MsgType: internalpb.MsgType_kTimeTick,
PeerID: tt.peerID, PeerID: tt.peerID,
@ -77,7 +80,12 @@ func (tt *timeTick) tick() error {
}, },
} }
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
tt.tickMsgStream.Produce(&msgPack) err := tt.tickMsgStream.Produce(&msgPack)
if err != nil {
log.Printf("proxy send time tick error: %v", err)
} else {
log.Printf("proxy send time tick message")
}
tt.lastTick = tt.currentTick tt.lastTick = tt.currentTick
return nil return nil
} }