Move dmlChannel and deltaChannel into timeticksync (#12254)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/12254/merge
Cai Yudong 2021-11-25 10:07:15 +08:00 committed by GitHub
parent 8a8ebed23d
commit 858e95f377
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 274 additions and 221 deletions

View File

@ -12,6 +12,7 @@
package rootcoord
import (
"context"
"fmt"
"sync"
@ -29,16 +30,18 @@ type dmlMsgStream struct {
}
type dmlChannels struct {
core *Core
ctx context.Context
factory msgstream.Factory
namePrefix string
capacity int64
idx *atomic.Int64
pool sync.Map
}
func newDmlChannels(c *Core, chanNamePrefix string, chanNum int64) *dmlChannels {
func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePrefix string, chanNum int64) *dmlChannels {
d := &dmlChannels{
core: c,
ctx: ctx,
factory: factory,
namePrefix: chanNamePrefix,
capacity: chanNum,
idx: atomic.NewInt64(0),
@ -46,8 +49,8 @@ func newDmlChannels(c *Core, chanNamePrefix string, chanNum int64) *dmlChannels
}
for i := int64(0); i < chanNum; i++ {
name := getDmlChannelName(d.namePrefix, i)
ms, err := c.msFactory.NewMsgStream(c.ctx)
name := genChannelName(d.namePrefix, i)
ms, err := factory.NewMsgStream(ctx)
if err != nil {
log.Error("Failed to add msgstream", zap.String("name", name), zap.Error(err))
panic("Failed to add msgstream")
@ -62,13 +65,12 @@ func newDmlChannels(c *Core, chanNamePrefix string, chanNum int64) *dmlChannels
return d
}
func (d *dmlChannels) GetDmlMsgStreamName() string {
func (d *dmlChannels) getChannelName() string {
cnt := d.idx.Inc()
return getDmlChannelName(d.namePrefix, (cnt-1)%d.capacity)
return genChannelName(d.namePrefix, (cnt-1)%d.capacity)
}
// ListPhysicalChannels lists all dml channel names
func (d *dmlChannels) ListPhysicalChannels() []string {
func (d *dmlChannels) listChannels() []string {
var chanNames []string
d.pool.Range(
func(k, v interface{}) bool {
@ -83,13 +85,11 @@ func (d *dmlChannels) ListPhysicalChannels() []string {
return chanNames
}
// GetNumChannels get current dml channel count
func (d *dmlChannels) GetPhysicalChannelNum() int {
return len(d.ListPhysicalChannels())
func (d *dmlChannels) getChannelNum() int {
return len(d.listChannels())
}
// Broadcast broadcasts msg pack into specified channel
func (d *dmlChannels) Broadcast(chanNames []string, pack *msgstream.MsgPack) error {
func (d *dmlChannels) broadcast(chanNames []string, pack *msgstream.MsgPack) error {
for _, chanName := range chanNames {
v, ok := d.pool.Load(chanName)
if !ok {
@ -110,8 +110,7 @@ func (d *dmlChannels) Broadcast(chanNames []string, pack *msgstream.MsgPack) err
return nil
}
// BroadcastMark broadcasts msg pack into specified channel and returns related message id
func (d *dmlChannels) BroadcastMark(chanNames []string, pack *msgstream.MsgPack) (map[string][]byte, error) {
func (d *dmlChannels) broadcastMark(chanNames []string, pack *msgstream.MsgPack) (map[string][]byte, error) {
result := make(map[string][]byte)
for _, chanName := range chanNames {
v, ok := d.pool.Load(chanName)
@ -140,8 +139,7 @@ func (d *dmlChannels) BroadcastMark(chanNames []string, pack *msgstream.MsgPack)
return result, nil
}
// AddProducerChannels add named channels as producer
func (d *dmlChannels) AddProducerChannels(names ...string) {
func (d *dmlChannels) addChannels(names ...string) {
for _, name := range names {
v, ok := d.pool.Load(name)
if !ok {
@ -159,8 +157,7 @@ func (d *dmlChannels) AddProducerChannels(names ...string) {
}
}
// RemoveProducerChannels removes specified channels
func (d *dmlChannels) RemoveProducerChannels(names ...string) {
func (d *dmlChannels) removeChannels(names ...string) {
for _, name := range names {
v, ok := d.pool.Load(name)
if !ok {
@ -180,6 +177,6 @@ func (d *dmlChannels) RemoveProducerChannels(names ...string) {
}
}
func getDmlChannelName(prefix string, idx int64) string {
func genChannelName(prefix string, idx int64) string {
return fmt.Sprintf("%s_%d", prefix, idx)
}

View File

@ -38,39 +38,36 @@ func TestDmlChannels(t *testing.T) {
err := factory.SetParams(m)
assert.Nil(t, err)
core, err := NewCore(ctx, factory)
assert.Nil(t, err)
dml := newDmlChannels(core, dmlChanPrefix, totalDmlChannelNum)
chanNames := dml.ListPhysicalChannels()
dml := newDmlChannels(ctx, factory, dmlChanPrefix, totalDmlChannelNum)
chanNames := dml.listChannels()
assert.Equal(t, 0, len(chanNames))
randStr := funcutil.RandomString(8)
assert.Panics(t, func() { dml.AddProducerChannels(randStr) })
assert.Panics(t, func() { dml.Broadcast([]string{randStr}, nil) })
assert.Panics(t, func() { dml.BroadcastMark([]string{randStr}, nil) })
assert.Panics(t, func() { dml.RemoveProducerChannels(randStr) })
assert.Panics(t, func() { dml.addChannels(randStr) })
assert.Panics(t, func() { dml.broadcast([]string{randStr}, nil) })
assert.Panics(t, func() { dml.broadcastMark([]string{randStr}, nil) })
assert.Panics(t, func() { dml.removeChannels(randStr) })
// dml_xxx_0 => {chanName0, chanName2}
// dml_xxx_1 => {chanName1}
chanName0 := dml.GetDmlMsgStreamName()
dml.AddProducerChannels(chanName0)
assert.Equal(t, 1, dml.GetPhysicalChannelNum())
chanName0 := dml.getChannelName()
dml.addChannels(chanName0)
assert.Equal(t, 1, dml.getChannelNum())
chanName1 := dml.GetDmlMsgStreamName()
dml.AddProducerChannels(chanName1)
assert.Equal(t, 2, dml.GetPhysicalChannelNum())
chanName1 := dml.getChannelName()
dml.addChannels(chanName1)
assert.Equal(t, 2, dml.getChannelNum())
chanName2 := dml.GetDmlMsgStreamName()
dml.AddProducerChannels(chanName2)
assert.Equal(t, 2, dml.GetPhysicalChannelNum())
chanName2 := dml.getChannelName()
dml.addChannels(chanName2)
assert.Equal(t, 2, dml.getChannelNum())
dml.RemoveProducerChannels(chanName0)
assert.Equal(t, 2, dml.GetPhysicalChannelNum())
dml.removeChannels(chanName0)
assert.Equal(t, 2, dml.getChannelNum())
dml.RemoveProducerChannels(chanName1)
assert.Equal(t, 1, dml.GetPhysicalChannelNum())
dml.removeChannels(chanName1)
assert.Equal(t, 1, dml.getChannelNum())
dml.RemoveProducerChannels(chanName0)
assert.Equal(t, 0, dml.GetPhysicalChannelNum())
dml.removeChannels(chanName0)
assert.Equal(t, 0, dml.getChannelNum())
}

View File

@ -129,12 +129,6 @@ type Core struct {
CallWatchChannels func(ctx context.Context, collectionID int64, channelNames []string) error
// dml channels used for insert
dmlChannels *dmlChannels
// delta channels used for delete
deltaChannels *dmlChannels
//Proxy manager
proxyManager *proxyManager
@ -482,8 +476,7 @@ func (c *Core) setMsgStreams() error {
}
metrics.RootCoordDDChannelTimeTick.Set(float64(tsoutil.Mod24H(t)))
//c.dmlChannels.BroadcastAll(&msgPack)
pc := c.dmlChannels.ListPhysicalChannels()
pc := c.chanTimeTick.listDmlChannels()
pt := make([]uint64, len(pc))
for i := 0; i < len(pt); i++ {
pt[i] = t
@ -503,7 +496,7 @@ func (c *Core) setMsgStreams() error {
// zap.Any("DefaultTs", t),
// zap.Any("sourceID", c.session.ServerID),
// zap.Any("reason", reason))
return c.chanTimeTick.UpdateTimeTick(&ttMsg, reason)
return c.chanTimeTick.updateTimeTick(&ttMsg, reason)
}
c.SendDdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest, channelNames []string) (map[string][]byte, error) {
@ -519,7 +512,7 @@ func (c *Core) setMsgStreams() error {
CreateCollectionRequest: *req,
}
msgPack.Msgs = append(msgPack.Msgs, msg)
return c.dmlChannels.BroadcastMark(channelNames, &msgPack)
return c.chanTimeTick.broadcastMarkDmlChannels(channelNames, &msgPack)
}
c.SendDdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest, channelNames []string) error {
@ -535,7 +528,7 @@ func (c *Core) setMsgStreams() error {
DropCollectionRequest: *req,
}
msgPack.Msgs = append(msgPack.Msgs, msg)
return c.dmlChannels.Broadcast(channelNames, &msgPack)
return c.chanTimeTick.broadcastDmlChannels(channelNames, &msgPack)
}
c.SendDdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest, channelNames []string) error {
@ -551,7 +544,7 @@ func (c *Core) setMsgStreams() error {
CreatePartitionRequest: *req,
}
msgPack.Msgs = append(msgPack.Msgs, msg)
return c.dmlChannels.Broadcast(channelNames, &msgPack)
return c.chanTimeTick.broadcastDmlChannels(channelNames, &msgPack)
}
c.SendDdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest, channelNames []string) error {
@ -567,7 +560,7 @@ func (c *Core) setMsgStreams() error {
DropPartitionRequest: *req,
}
msgPack.Msgs = append(msgPack.Msgs, msg)
return c.dmlChannels.Broadcast(channelNames, &msgPack)
return c.chanTimeTick.broadcastDmlChannels(channelNames, &msgPack)
}
return nil
@ -1007,46 +1000,23 @@ func (c *Core) Init() error {
return
}
// initialize dml channels used for insert
c.dmlChannels = newDmlChannels(c, Params.DmlChannelName, Params.DmlChannelNum)
// initialize delta channels used for delete, share Params.DmlChannelNum with dmlChannels
c.deltaChannels = newDmlChannels(c, Params.DeltaChannelName, Params.DmlChannelNum)
// recover physical channels for all collections
chanMap := c.MetaTable.ListCollectionPhysicalChannels()
for collID, chanNames := range chanMap {
c.dmlChannels.AddProducerChannels(chanNames...)
log.Debug("recover physical channels", zap.Int64("collID", collID), zap.Any("physical channels", chanNames))
// TODO: convert physical channel name to delta channel name
for _, chanName := range chanNames {
deltaChanName, err := ConvertChannelName(chanName, Params.DmlChannelName, Params.DeltaChannelName)
if err != nil {
log.Error("failed to convert dml channel name to delta channel name", zap.String("chanName", chanName))
return
}
c.deltaChannels.AddProducerChannels(deltaChanName)
log.Debug("recover delta channels", zap.Int64("collID", collID), zap.String("deltaChanName", deltaChanName))
}
}
c.chanTimeTick = newTimeTickSync(c)
c.chanTimeTick.AddProxy(c.session)
c.chanTimeTick = newTimeTickSync(c.ctx, c.session, c.msFactory, chanMap)
c.chanTimeTick.addProxy(c.session)
c.proxyClientManager = newProxyClientManager(c)
log.Debug("RootCoord, set proxy manager")
c.proxyManager, initError = newProxyManager(
c.ctx,
Params.EtcdEndpoints,
c.chanTimeTick.GetProxy,
c.chanTimeTick.getProxy,
c.proxyClientManager.GetProxyClients,
)
if initError != nil {
return
}
c.proxyManager.AddSession(c.chanTimeTick.AddProxy, c.proxyClientManager.AddProxyClient)
c.proxyManager.DelSession(c.chanTimeTick.DelProxy, c.proxyClientManager.DelProxyClient)
c.proxyManager.AddSession(c.chanTimeTick.addProxy, c.proxyClientManager.AddProxyClient)
c.proxyManager.DelSession(c.chanTimeTick.delProxy, c.proxyClientManager.DelProxyClient)
c.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
@ -1197,7 +1167,7 @@ func (c *Core) Start() error {
c.wg.Add(4)
go c.startTimeTickLoop()
go c.tsLoop()
go c.chanTimeTick.StartWatch(&c.wg)
go c.chanTimeTick.startWatch(&c.wg)
go c.checkFlushedSegmentsLoop()
go c.session.LivenessCheck(c.ctx, func() {
log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID))
@ -1839,7 +1809,7 @@ func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.Channel
msgTypeName := commonpb.MsgType_name[int32(in.Base.GetMsgType())]
return failStatus(commonpb.ErrorCode_UnexpectedError, "invalid message type "+msgTypeName), nil
}
err := c.chanTimeTick.UpdateTimeTick(in, "gRPC")
err := c.chanTimeTick.updateTimeTick(in, "gRPC")
if err != nil {
log.Error("UpdateTimeTick failed", zap.String("role", Params.RoleName),
zap.Int64("msgID", in.Base.MsgID), zap.Error(err))

View File

@ -327,7 +327,7 @@ func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32
vchanNames := make([]string, t.ShardsNum)
chanNames := make([]string, t.ShardsNum)
for i := int32(0); i < t.ShardsNum; i++ {
vchanNames[i] = fmt.Sprintf("%s_%dv%d", core.dmlChannels.GetDmlMsgStreamName(), collID, i)
vchanNames[i] = fmt.Sprintf("%s_%dv%d", core.chanTimeTick.getDmlChannelName(), collID, i)
chanNames[i] = ToPhysicalChannel(vchanNames[i])
}
@ -389,9 +389,9 @@ func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32
core.ddlLock.Lock()
defer core.ddlLock.Unlock()
core.chanTimeTick.AddDdlTimeTick(ts, reason)
core.chanTimeTick.addDdlTimeTick(ts, reason)
// clear ddl timetick in all conditions
defer core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
defer core.chanTimeTick.removeDdlTimeTick(ts, reason)
err = core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOpStr)
if err != nil {
@ -680,7 +680,7 @@ func TestRootCoord(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Equal(t, shardsNum, int32(core.dmlChannels.GetPhysicalChannelNum()))
assert.Equal(t, shardsNum, int32(core.chanTimeTick.getDmlChannelNum()))
createMeta, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
@ -1713,7 +1713,7 @@ func TestRootCoord(t *testing.T) {
ts1 = uint64(120)
ts2 = uint64(150)
)
numChan := core.chanTimeTick.GetChanNum()
numChan := core.chanTimeTick.getDmlChannelNum()
p1 := sessionutil.Session{
ServerID: 100,
}
@ -1735,15 +1735,15 @@ func TestRootCoord(t *testing.T) {
assert.Nil(t, err)
time.Sleep(100 * time.Millisecond)
cn0 := core.dmlChannels.GetDmlMsgStreamName()
cn1 := core.dmlChannels.GetDmlMsgStreamName()
cn2 := core.dmlChannels.GetDmlMsgStreamName()
core.dmlChannels.AddProducerChannels(cn0, cn1, cn2)
cn0 := core.chanTimeTick.getDmlChannelName()
cn1 := core.chanTimeTick.getDmlChannelName()
cn2 := core.chanTimeTick.getDmlChannelName()
core.chanTimeTick.addDmlChannels(cn0, cn1, cn2)
dn0 := core.deltaChannels.GetDmlMsgStreamName()
dn1 := core.deltaChannels.GetDmlMsgStreamName()
dn2 := core.deltaChannels.GetDmlMsgStreamName()
core.deltaChannels.AddProducerChannels(dn0, dn1, dn2)
dn0 := core.chanTimeTick.getDeltaChannelName()
dn1 := core.chanTimeTick.getDeltaChannelName()
dn2 := core.chanTimeTick.getDeltaChannelName()
core.chanTimeTick.addDeltaChannels(dn0, dn1, dn2)
msg0 := &internalpb.ChannelTimeTickMsg{
Base: &commonpb.MsgBase{
@ -1783,10 +1783,10 @@ func TestRootCoord(t *testing.T) {
time.Sleep(100 * time.Millisecond)
// 2 proxy, 1 rootcoord
assert.Equal(t, 3, core.chanTimeTick.GetProxyNum())
assert.Equal(t, 3, core.chanTimeTick.getProxyNum())
// add 3 proxy channels
assert.Equal(t, 3, core.chanTimeTick.GetChanNum()-numChan)
assert.Equal(t, 3, core.chanTimeTick.getDmlChannelNum()-numChan)
_, err = core.etcdCli.Delete(ctx2, proxy1)
assert.Nil(t, err)

View File

@ -136,10 +136,10 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
chanNames := make([]string, t.Req.ShardsNum)
deltaChanNames := make([]string, t.Req.ShardsNum)
for i := int32(0); i < t.Req.ShardsNum; i++ {
vchanNames[i] = fmt.Sprintf("%s_%dv%d", t.core.dmlChannels.GetDmlMsgStreamName(), collID, i)
vchanNames[i] = fmt.Sprintf("%s_%dv%d", t.core.chanTimeTick.getDmlChannelName(), collID, i)
chanNames[i] = ToPhysicalChannel(vchanNames[i])
deltaChanNames[i] = t.core.deltaChannels.GetDmlMsgStreamName()
deltaChanNames[i] = t.core.chanTimeTick.getDeltaChannelName()
deltaChanName, err1 := ConvertChannelName(chanNames[i], Params.DmlChannelName, Params.DeltaChannelName)
if err1 != nil || deltaChanName != deltaChanNames[i] {
return fmt.Errorf("dmlChanName %s and deltaChanName %s mis-match", chanNames[i], deltaChanNames[i])
@ -200,15 +200,15 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
t.core.ddlLock.Lock()
defer t.core.ddlLock.Unlock()
t.core.chanTimeTick.AddDdlTimeTick(ts, reason)
t.core.chanTimeTick.addDdlTimeTick(ts, reason)
// clear ddl timetick in all conditions
defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
defer t.core.chanTimeTick.removeDdlTimeTick(ts, reason)
// add dml channel before send dd msg
t.core.dmlChannels.AddProducerChannels(chanNames...)
t.core.chanTimeTick.addDmlChannels(chanNames...)
// also add delta channels
t.core.deltaChannels.AddProducerChannels(deltaChanNames...)
t.core.chanTimeTick.addDeltaChannels(deltaChanNames...)
ids, err := t.core.SendDdCreateCollectionReq(ctx, &ddCollReq, chanNames)
if err != nil {
@ -222,13 +222,13 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
}
err = t.core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOpStr)
if err != nil {
t.core.dmlChannels.RemoveProducerChannels(chanNames...)
t.core.deltaChannels.RemoveProducerChannels(deltaChanNames...)
t.core.chanTimeTick.removeDmlChannels(chanNames...)
t.core.chanTimeTick.removeDeltaChannels(deltaChanNames...)
// it's ok just to leave create collection message sent, datanode and querynode does't process CreateCollection logic
return fmt.Errorf("meta table add collection failed,error = %w", err)
}
t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
t.core.chanTimeTick.removeDdlTimeTick(ts, reason)
t.core.SendTimeTick(ts, reason)
return nil
@ -306,9 +306,9 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
t.core.ddlLock.Lock()
defer t.core.ddlLock.Unlock()
t.core.chanTimeTick.AddDdlTimeTick(ts, reason)
t.core.chanTimeTick.addDdlTimeTick(ts, reason)
// clear ddl timetick in all conditions
defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
defer t.core.chanTimeTick.removeDdlTimeTick(ts, reason)
err = t.core.MetaTable.DeleteCollection(collMeta.ID, ts, ddOpStr)
if err != nil {
@ -320,24 +320,23 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
return err
}
t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
t.core.chanTimeTick.removeDdlTimeTick(ts, reason)
t.core.SendTimeTick(ts, reason)
// send tt into deleted channels to tell data_node to clear flowgragh
t.core.chanTimeTick.SendTimeTickToChannel(collMeta.PhysicalChannelNames, ts)
t.core.chanTimeTick.sendTimeTickToChannel(collMeta.PhysicalChannelNames, ts)
// remove dml channel after send dd msg
t.core.dmlChannels.RemoveProducerChannels(collMeta.PhysicalChannelNames...)
t.core.chanTimeTick.removeDmlChannels(collMeta.PhysicalChannelNames...)
// remove delta channels
deltaChanNames := make([]string, len(collMeta.PhysicalChannelNames))
for i, chanName := range collMeta.PhysicalChannelNames {
deltaChanNames[i], err = ConvertChannelName(chanName, Params.DmlChannelName, Params.DeltaChannelName)
if err != nil {
if deltaChanNames[i], err = ConvertChannelName(chanName, Params.DmlChannelName, Params.DeltaChannelName); err != nil {
return err
}
}
t.core.deltaChannels.RemoveProducerChannels(deltaChanNames...)
t.core.chanTimeTick.removeDeltaChannels(deltaChanNames...)
return nil
}
@ -522,9 +521,9 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
t.core.ddlLock.Lock()
defer t.core.ddlLock.Unlock()
t.core.chanTimeTick.AddDdlTimeTick(ts, reason)
t.core.chanTimeTick.addDdlTimeTick(ts, reason)
// clear ddl timetick in all conditions
defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
defer t.core.chanTimeTick.removeDdlTimeTick(ts, reason)
err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ts, ddOpStr)
if err != nil {
@ -536,7 +535,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
return err
}
t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
t.core.chanTimeTick.removeDdlTimeTick(ts, reason)
t.core.SendTimeTick(ts, reason)
return nil
}
@ -607,9 +606,9 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
t.core.ddlLock.Lock()
defer t.core.ddlLock.Unlock()
t.core.chanTimeTick.AddDdlTimeTick(ts, reason)
t.core.chanTimeTick.addDdlTimeTick(ts, reason)
// clear ddl timetick in all conditions
defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
defer t.core.chanTimeTick.removeDdlTimeTick(ts, reason)
_, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ts, ddOpStr)
if err != nil {
@ -621,7 +620,7 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
return err
}
t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
t.core.chanTimeTick.removeDdlTimeTick(ts, reason)
t.core.SendTimeTick(ts, reason)
return nil
}

View File

@ -12,6 +12,7 @@
package rootcoord
import (
"context"
"fmt"
"math"
"sync"
@ -29,7 +30,12 @@ import (
)
type timetickSync struct {
core *Core
ctx context.Context
session *sessionutil.Session
dmlChannels *dmlChannels // used for insert
deltaChannels *dmlChannels // used for delete
lock sync.Mutex
proxyTimeTick map[typeutil.UniqueID]*channelTimeTickMsg
sendChan chan map[typeutil.UniqueID]*channelTimeTickMsg
@ -64,10 +70,38 @@ func (c *channelTimeTickMsg) getTimetick(channelName string) typeutil.Timestamp
return c.in.DefaultTimestamp
}
func newTimeTickSync(core *Core) *timetickSync {
func newTimeTickSync(ctx context.Context, session *sessionutil.Session, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync {
// initialize dml channels used for insert
dmlChannels := newDmlChannels(ctx, factory, Params.DmlChannelName, Params.DmlChannelNum)
// initialize delta channels used for delete, share Params.DmlChannelNum with dmlChannels
deltaChannels := newDmlChannels(ctx, factory, Params.DeltaChannelName, Params.DmlChannelNum)
// recover physical channels for all collections
for collID, chanNames := range chanMap {
dmlChannels.addChannels(chanNames...)
log.Debug("recover physical channels", zap.Int64("collID", collID), zap.Any("physical channels", chanNames))
var err error
deltaChanNames := make([]string, len(chanNames))
for i, chanName := range chanNames {
deltaChanNames[i], err = ConvertChannelName(chanName, Params.DmlChannelName, Params.DeltaChannelName)
if err != nil {
log.Error("failed to convert dml channel name to delta channel name", zap.String("chanName", chanName))
panic("invalid dml channel name " + chanName)
}
}
deltaChannels.addChannels(deltaChanNames...)
log.Debug("recover delta channels", zap.Int64("collID", collID), zap.Any("delta channels", deltaChanNames))
}
return &timetickSync{
ctx: ctx,
session: session,
dmlChannels: dmlChannels,
deltaChannels: deltaChannels,
lock: sync.Mutex{},
core: core,
proxyTimeTick: make(map[typeutil.UniqueID]*channelTimeTickMsg),
sendChan: make(chan map[typeutil.UniqueID]*channelTimeTickMsg, 16),
@ -99,7 +133,7 @@ func (t *timetickSync) sendToChannel() {
// AddDmlTimeTick add ts into ddlTimetickInfos[sourceID],
// can be used to tell if DDL operation is in process.
func (t *timetickSync) AddDdlTimeTick(ts typeutil.Timestamp, reason string) {
func (t *timetickSync) addDdlTimeTick(ts typeutil.Timestamp, reason string) {
t.ddlLock.Lock()
defer t.ddlLock.Unlock()
@ -114,7 +148,7 @@ func (t *timetickSync) AddDdlTimeTick(ts typeutil.Timestamp, reason string) {
// RemoveDdlTimeTick is invoked in UpdateTimeTick.
// It clears the ts generated by AddDdlTimeTick, indicates DDL operation finished.
func (t *timetickSync) RemoveDdlTimeTick(ts typeutil.Timestamp, reason string) {
func (t *timetickSync) removeDdlTimeTick(ts typeutil.Timestamp, reason string) {
t.ddlLock.Lock()
defer t.ddlLock.Unlock()
@ -136,7 +170,7 @@ func (t *timetickSync) RemoveDdlTimeTick(ts typeutil.Timestamp, reason string) {
}
}
func (t *timetickSync) GetDdlMinTimeTick() typeutil.Timestamp {
func (t *timetickSync) getDdlMinTimeTick() typeutil.Timestamp {
t.ddlLock.Lock()
defer t.ddlLock.Unlock()
@ -144,7 +178,7 @@ func (t *timetickSync) GetDdlMinTimeTick() typeutil.Timestamp {
}
// UpdateTimeTick check msg validation and send it to local channel
func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg, reason string) error {
func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason string) error {
t.lock.Lock()
defer t.lock.Unlock()
if len(in.ChannelNames) == 0 && in.DefaultTimestamp == 0 {
@ -160,7 +194,7 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg, reason
}
// if ddl operation not finished, skip current ts update
ddlMinTs := t.GetDdlMinTimeTick()
ddlMinTs := t.getDdlMinTimeTick()
if in.DefaultTimestamp > ddlMinTs {
log.Debug("ddl not finished", zap.Int64("source id", in.Base.SourceID),
zap.Uint64("curr ts", in.DefaultTimestamp),
@ -169,7 +203,7 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg, reason
return nil
}
if in.Base.SourceID == t.core.session.ServerID {
if in.Base.SourceID == t.session.ServerID {
if prev != nil && in.DefaultTimestamp <= prev.in.DefaultTimestamp {
log.Debug("timestamp go back", zap.Int64("source id", in.Base.SourceID),
zap.Uint64("curr ts", in.DefaultTimestamp),
@ -193,14 +227,14 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg, reason
return nil
}
func (t *timetickSync) AddProxy(sess *sessionutil.Session) {
func (t *timetickSync) addProxy(sess *sessionutil.Session) {
t.lock.Lock()
defer t.lock.Unlock()
t.proxyTimeTick[sess.ServerID] = nil
log.Debug("Add proxy for timeticksync", zap.Int64("serverID", sess.ServerID))
}
func (t *timetickSync) DelProxy(sess *sessionutil.Session) {
func (t *timetickSync) delProxy(sess *sessionutil.Session) {
t.lock.Lock()
defer t.lock.Unlock()
if _, ok := t.proxyTimeTick[sess.ServerID]; ok {
@ -210,7 +244,7 @@ func (t *timetickSync) DelProxy(sess *sessionutil.Session) {
}
}
func (t *timetickSync) GetProxy(sess []*sessionutil.Session) {
func (t *timetickSync) getProxy(sess []*sessionutil.Session) {
t.lock.Lock()
defer t.lock.Unlock()
for _, s := range sess {
@ -219,12 +253,12 @@ func (t *timetickSync) GetProxy(sess []*sessionutil.Session) {
}
// StartWatch watch proxy node change and process all channels' timetick msg
func (t *timetickSync) StartWatch(wg *sync.WaitGroup) {
func (t *timetickSync) startWatch(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-t.core.ctx.Done():
log.Debug("rootcoord context done", zap.Error(t.core.ctx.Err()))
case <-t.ctx.Done():
log.Debug("rootcoord context done", zap.Error(t.ctx.Err()))
return
case proxyTimetick, ok := <-t.sendChan:
if !ok {
@ -233,7 +267,7 @@ func (t *timetickSync) StartWatch(wg *sync.WaitGroup) {
}
// reduce each channel to get min timestamp
local := proxyTimetick[t.core.session.ServerID]
local := proxyTimetick[t.session.ServerID]
if len(local.in.ChannelNames) == 0 {
continue
}
@ -251,7 +285,7 @@ func (t *timetickSync) StartWatch(wg *sync.WaitGroup) {
mints = ts
}
}
if err := t.SendTimeTickToChannel([]string{chanName}, mints); err != nil {
if err := t.sendTimeTickToChannel([]string{chanName}, mints); err != nil {
log.Debug("SendTimeTickToChannel fail", zap.Error(err))
}
wg.Done()
@ -270,7 +304,7 @@ func (t *timetickSync) StartWatch(wg *sync.WaitGroup) {
}
// SendTimeTickToChannel send each channel's min timetick to msg stream
func (t *timetickSync) SendTimeTickToChannel(chanNames []string, ts typeutil.Timestamp) error {
func (t *timetickSync) sendTimeTickToChannel(chanNames []string, ts typeutil.Timestamp) error {
msgPack := msgstream.MsgPack{}
baseMsg := msgstream.BaseMsg{
BeginTimestamp: ts,
@ -282,7 +316,7 @@ func (t *timetickSync) SendTimeTickToChannel(chanNames []string, ts typeutil.Tim
MsgType: commonpb.MsgType_TimeTick,
MsgID: 0,
Timestamp: ts,
SourceID: t.core.session.ServerID,
SourceID: t.session.ServerID,
},
}
timeTickMsg := &msgstream.TimeTickMsg{
@ -291,7 +325,7 @@ func (t *timetickSync) SendTimeTickToChannel(chanNames []string, ts typeutil.Tim
}
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
if err := t.core.dmlChannels.Broadcast(chanNames, &msgPack); err != nil {
if err := t.dmlChannels.broadcast(chanNames, &msgPack); err != nil {
return err
}
@ -302,15 +336,62 @@ func (t *timetickSync) SendTimeTickToChannel(chanNames []string, ts typeutil.Tim
}
// GetProxyNum return the num of detected proxy node
func (t *timetickSync) GetProxyNum() int {
func (t *timetickSync) getProxyNum() int {
t.lock.Lock()
defer t.lock.Unlock()
return len(t.proxyTimeTick)
}
// GetChanNum return the num of channel
func (t *timetickSync) GetChanNum() int {
return t.core.dmlChannels.GetPhysicalChannelNum()
///////////////////////////////////////////////////////////////////////////////
// GetDmlChannelName return a valid dml channel name
func (t *timetickSync) getDmlChannelName() string {
return t.dmlChannels.getChannelName()
}
// GetDmlChannelNum return the num of dml channels
func (t *timetickSync) getDmlChannelNum() int {
return t.dmlChannels.getChannelNum()
}
// ListDmlChannels return all in-use dml channel names
func (t *timetickSync) listDmlChannels() []string {
return t.dmlChannels.listChannels()
}
// AddDmlChannels add dml channels
func (t *timetickSync) addDmlChannels(names ...string) {
t.dmlChannels.addChannels(names...)
}
// RemoveDmlChannels remove dml channels
func (t *timetickSync) removeDmlChannels(names ...string) {
t.dmlChannels.removeChannels(names...)
}
// BroadcastDmlChannels broadcasts msg pack into dml channels
func (t *timetickSync) broadcastDmlChannels(chanNames []string, pack *msgstream.MsgPack) error {
return t.dmlChannels.broadcast(chanNames, pack)
}
// BroadcastMarkDmlChannels broadcasts msg pack into dml channels
func (t *timetickSync) broadcastMarkDmlChannels(chanNames []string, pack *msgstream.MsgPack) (map[string][]byte, error) {
return t.dmlChannels.broadcastMark(chanNames, pack)
}
///////////////////////////////////////////////////////////////////////////////
// GetDeltaChannelName return a valid delta channel name
func (t *timetickSync) getDeltaChannelName() string {
return t.deltaChannels.getChannelName()
}
// AddDeltaChannels add delta channels
func (t *timetickSync) addDeltaChannels(names ...string) {
t.deltaChannels.addChannels(names...)
}
// RemoveDeltaChannels remove delta channels
func (t *timetickSync) removeDeltaChannels(names ...string) {
t.deltaChannels.removeChannels(names...)
}
func minTimeTick(tt ...typeutil.Timestamp) typeutil.Timestamp {

View File

@ -13,90 +13,99 @@ package rootcoord
import (
"context"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/stretchr/testify/assert"
)
func TestTimetickSync_sendToChannel(t *testing.T) {
tt := newTimeTickSync(nil)
tt.sendToChannel()
func TestTimetickSync(t *testing.T) {
ctx := context.Background()
ctt := &internalpb.ChannelTimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
},
session := &sessionutil.Session{
ServerID: 100,
}
cttm := newChannelTimeTickMsg(ctt)
tt.proxyTimeTick[1] = cttm
tt.sendToChannel()
tt.proxyTimeTick[2] = nil
tt.sendToChannel()
}
func TestTimetickSync_RemoveDdlTimeTick(t *testing.T) {
tt := newTimeTickSync(nil)
tt.AddDdlTimeTick(uint64(1), "1")
tt.AddDdlTimeTick(uint64(2), "2")
tt.RemoveDdlTimeTick(uint64(1), "1")
assert.Equal(t, tt.ddlMinTs, uint64(2))
}
func TestTimetickSync_UpdateTimeTick(t *testing.T) {
tt := newTimeTickSync(nil)
ctt := &internalpb.ChannelTimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
},
DefaultTimestamp: 0,
}
err := tt.UpdateTimeTick(ctt, "1")
factory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"pulsarAddress": Params.PulsarAddress,
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err := factory.SetParams(m)
assert.Nil(t, err)
ctt.ChannelNames = append(ctt.ChannelNames, "a")
err = tt.UpdateTimeTick(ctt, "1")
assert.Error(t, err)
//chanMap := map[typeutil.UniqueID][]string{
// int64(1): {"rootcoord-dml_0"},
//}
core := &Core{
ctx: context.TODO(),
cancel: nil,
ddlLock: sync.Mutex{},
msFactory: nil,
session: &sessionutil.Session{
ServerID: 100,
},
}
tt.core = core
Params.DmlChannelNum = 2
Params.DmlChannelName = "rootcoord-dml"
Params.DeltaChannelName = "rootcoord-delta"
ttSync := newTimeTickSync(ctx, session, factory, nil)
ctt.Timestamps = append(ctt.Timestamps, uint64(2))
ctt.Base.SourceID = int64(1)
cttm := newChannelTimeTickMsg(ctt)
tt.proxyTimeTick[ctt.Base.SourceID] = cttm
ctt.DefaultTimestamp = uint64(200)
tt.ddlMinTs = uint64(100)
err = tt.UpdateTimeTick(ctt, "1")
assert.Nil(t, err)
t.Run("sendToChannel", func(t *testing.T) {
ttSync.sendToChannel()
tt.ddlMinTs = uint64(300)
tt.proxyTimeTick[ctt.Base.SourceID].in.DefaultTimestamp = uint64(1)
tt.core.session.ServerID = int64(1)
err = tt.UpdateTimeTick(ctt, "1")
assert.Nil(t, err)
}
func Test_minTimeTick(t *testing.T) {
tts := make([]uint64, 2)
tts[0] = uint64(5)
tts[1] = uint64(3)
ret := minTimeTick(tts...)
assert.Equal(t, ret, tts[1])
ttSync.proxyTimeTick[1] = nil
ttSync.sendToChannel()
msg := &internalpb.ChannelTimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
},
}
ttSync.proxyTimeTick[1] = newChannelTimeTickMsg(msg)
ttSync.sendToChannel()
})
t.Run("RemoveDdlTimeTick", func(t *testing.T) {
ttSync.addDdlTimeTick(uint64(1), "1")
ttSync.addDdlTimeTick(uint64(2), "2")
ttSync.removeDdlTimeTick(uint64(1), "1")
assert.Equal(t, ttSync.ddlMinTs, uint64(2))
})
t.Run("UpdateTimeTick", func(t *testing.T) {
msg := &internalpb.ChannelTimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
SourceID: int64(1),
},
DefaultTimestamp: 0,
}
err := ttSync.updateTimeTick(msg, "1")
assert.Nil(t, err)
msg.ChannelNames = append(msg.ChannelNames, "a")
err = ttSync.updateTimeTick(msg, "1")
assert.Error(t, err)
msg.Timestamps = append(msg.Timestamps, uint64(2))
msg.DefaultTimestamp = uint64(200)
cttMsg := newChannelTimeTickMsg(msg)
ttSync.proxyTimeTick[msg.Base.SourceID] = cttMsg
ttSync.ddlMinTs = uint64(100)
err = ttSync.updateTimeTick(msg, "1")
assert.Nil(t, err)
ttSync.ddlMinTs = uint64(300)
ttSync.session.ServerID = int64(1)
err = ttSync.updateTimeTick(msg, "1")
assert.Nil(t, err)
})
t.Run("minTimeTick", func(t *testing.T) {
tts := make([]uint64, 2)
tts[0] = uint64(5)
tts[1] = uint64(3)
ret := minTimeTick(tts...)
assert.Equal(t, ret, tts[1])
})
}