mirror of https://github.com/milvus-io/milvus.git
Save start position in collection meta (#8682)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/8696/head
parent
585989935a
commit
88e319acda
|
@ -129,11 +129,8 @@ func TestGrpcService(t *testing.T) {
|
|||
timeTickArray = append(timeTickArray, ts)
|
||||
return nil
|
||||
}
|
||||
createCollectionArray := make([]*internalpb.CreateCollectionRequest, 0, 16)
|
||||
core.SendDdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest, channelNames []string) error {
|
||||
t.Logf("Create Colllection %s", req.CollectionName)
|
||||
createCollectionArray = append(createCollectionArray, req)
|
||||
return nil
|
||||
core.SendDdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest, channelNames []string) (map[string][]byte, error) {
|
||||
return map[string][]byte{}, nil
|
||||
}
|
||||
|
||||
dropCollectionArray := make([]*internalpb.DropCollectionRequest, 0, 16)
|
||||
|
@ -328,11 +325,14 @@ func TestGrpcService(t *testing.T) {
|
|||
|
||||
status, err := cli.CreateCollection(ctx, req)
|
||||
assert.Nil(t, err)
|
||||
colls, err := core.MetaTable.ListCollections(0)
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.Equal(t, 1, len(createCollectionArray))
|
||||
assert.Equal(t, 1, len(colls))
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
|
||||
assert.Equal(t, commonpb.MsgType_CreateCollection, createCollectionArray[0].Base.MsgType)
|
||||
assert.Equal(t, collName, createCollectionArray[0].CollectionName)
|
||||
//assert.Equal(t, commonpb.MsgType_CreateCollection, createCollectionArray[0].Base.MsgType)
|
||||
_, has := colls[collName]
|
||||
assert.True(t, has)
|
||||
|
||||
req.Base.MsgID = 101
|
||||
req.Base.Timestamp = 101
|
||||
|
@ -359,10 +359,11 @@ func TestGrpcService(t *testing.T) {
|
|||
status, err = cli.CreateCollection(ctx, req)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
|
||||
assert.Equal(t, 2, len(createCollectionArray))
|
||||
assert.Equal(t, commonpb.MsgType_CreateCollection, createCollectionArray[1].Base.MsgType)
|
||||
assert.Equal(t, collName2, createCollectionArray[1].CollectionName)
|
||||
|
||||
colls, err = core.MetaTable.ListCollections(0)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, 2, len(colls))
|
||||
_, has = colls[collName2]
|
||||
assert.True(t, has)
|
||||
//time stamp go back, master response to add the timestamp, so the time tick will never go back
|
||||
//schema.Name = "testColl-goback"
|
||||
//sbf, err = proto.Marshal(&schema)
|
||||
|
|
|
@ -95,6 +95,30 @@ func (d *dmlChannels) Broadcast(chanNames []string, pack *msgstream.MsgPack) err
|
|||
return nil
|
||||
}
|
||||
|
||||
// BroadcastMark broadcasts msg pack into specified channel and returns related message id
|
||||
func (d *dmlChannels) BroadcastMark(chanNames []string, pack *msgstream.MsgPack) (map[string][]byte, error) {
|
||||
result := make(map[string][]byte)
|
||||
for _, chanName := range chanNames {
|
||||
// only in-use chanName exist in refcnt
|
||||
if _, ok := d.refcnt.Load(chanName); ok {
|
||||
v, _ := d.pool.Load(chanName)
|
||||
ids, err := (*(v.(*msgstream.MsgStream))).BroadcastMark(pack)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
for chanName, idList := range ids {
|
||||
// idList should have length 1, just flat by iteration
|
||||
for _, id := range idList {
|
||||
result[chanName] = id.Serialize()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return result, fmt.Errorf("channel %s not exist", chanName)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// AddProducerChannels add named channels as producer
|
||||
func (d *dmlChannels) AddProducerChannels(names ...string) {
|
||||
for _, name := range names {
|
||||
|
|
|
@ -99,7 +99,8 @@ type Core struct {
|
|||
SendTimeTick func(t typeutil.Timestamp, reason string) error
|
||||
|
||||
//setMsgStreams, send create collection into dd channel
|
||||
SendDdCreateCollectionReq func(ctx context.Context, req *internalpb.CreateCollectionRequest, channelNames []string) error
|
||||
//returns corresponding message id for each channel
|
||||
SendDdCreateCollectionReq func(ctx context.Context, req *internalpb.CreateCollectionRequest, channelNames []string) (map[string][]byte, error)
|
||||
|
||||
//setMsgStreams, send drop collection into dd channel, and notify the proxy to delete this collection
|
||||
SendDdDropCollectionReq func(ctx context.Context, req *internalpb.DropCollectionRequest, channelNames []string) error
|
||||
|
@ -503,7 +504,7 @@ func (c *Core) setMsgStreams() error {
|
|||
return c.chanTimeTick.UpdateTimeTick(&ttMsg, reason)
|
||||
}
|
||||
|
||||
c.SendDdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest, channelNames []string) error {
|
||||
c.SendDdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest, channelNames []string) (map[string][]byte, error) {
|
||||
msgPack := ms.MsgPack{}
|
||||
baseMsg := ms.BaseMsg{
|
||||
Ctx: ctx,
|
||||
|
@ -516,7 +517,7 @@ func (c *Core) setMsgStreams() error {
|
|||
CreateCollectionRequest: *req,
|
||||
}
|
||||
msgPack.Msgs = append(msgPack.Msgs, msg)
|
||||
return c.dmlChannels.Broadcast(channelNames, &msgPack)
|
||||
return c.dmlChannels.BroadcastMark(channelNames, &msgPack)
|
||||
}
|
||||
|
||||
c.SendDdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest, channelNames []string) error {
|
||||
|
@ -1010,6 +1011,8 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
|
|||
var dbName, collName string
|
||||
|
||||
switch ddOp.Type {
|
||||
// TODO remove create collection resend
|
||||
// since create collection needs a start position to succeed
|
||||
case CreateCollectionDDType:
|
||||
var ddReq = internalpb.CreateCollectionRequest{}
|
||||
if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil {
|
||||
|
@ -1019,7 +1022,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = c.SendDdCreateCollectionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
|
||||
if _, err = c.SendDdCreateCollectionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
|
||||
return err
|
||||
}
|
||||
invalidateCache = false
|
||||
|
|
|
@ -715,7 +715,6 @@ func TestRootCoord(t *testing.T) {
|
|||
assert.Equal(t, pt.in.Timestamps[0], pt.in.DefaultTimestamp)
|
||||
assert.Equal(t, pt.timeTick[pt.in.ChannelNames[0]], pt.in.DefaultTimestamp)
|
||||
assert.Equal(t, pt.timeTick[pt.in.ChannelNames[1]], pt.in.DefaultTimestamp)
|
||||
assert.LessOrEqual(t, createMsg.BeginTimestamp, pt.in.Timestamps[0])
|
||||
core.chanTimeTick.lock.Unlock()
|
||||
|
||||
// check DD operation info
|
||||
|
@ -2342,8 +2341,8 @@ func TestCheckInit(t *testing.T) {
|
|||
err = c.checkInit()
|
||||
assert.NotNil(t, err)
|
||||
|
||||
c.SendDdCreateCollectionReq = func(context.Context, *internalpb.CreateCollectionRequest, []string) error {
|
||||
return nil
|
||||
c.SendDdCreateCollectionReq = func(context.Context, *internalpb.CreateCollectionRequest, []string) (map[string][]byte, error) {
|
||||
return map[string][]byte{}, nil
|
||||
}
|
||||
err = c.checkInit()
|
||||
assert.NotNil(t, err)
|
||||
|
|
|
@ -288,7 +288,6 @@ func (ss *suffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error
|
|||
after, err := ss.checkKeyTS(key, ts)
|
||||
ss.Unlock()
|
||||
|
||||
log.Warn("", zap.Bool("after", after), zap.Error(err))
|
||||
ss.RLock()
|
||||
defer ss.RUnlock()
|
||||
// ts after latest ts, load key as acceleration
|
||||
|
|
|
@ -197,18 +197,25 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
|
|||
// clear ddl timetick in all conditions
|
||||
defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
|
||||
|
||||
err = t.core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("meta table add collection failed,error = %w", err)
|
||||
}
|
||||
|
||||
// add dml channel before send dd msg
|
||||
t.core.dmlChannels.AddProducerChannels(chanNames...)
|
||||
|
||||
err = t.core.SendDdCreateCollectionReq(ctx, &ddCollReq, chanNames)
|
||||
ids, err := t.core.SendDdCreateCollectionReq(ctx, &ddCollReq, chanNames)
|
||||
if err != nil {
|
||||
return fmt.Errorf("send dd create collection req failed, error = %w", err)
|
||||
}
|
||||
for _, pchan := range collInfo.PhysicalChannelNames {
|
||||
collInfo.StartPositions = append(collInfo.StartPositions, &commonpb.KeyDataPair{
|
||||
Key: pchan,
|
||||
Data: ids[pchan],
|
||||
})
|
||||
}
|
||||
err = t.core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOp)
|
||||
if err != nil {
|
||||
t.core.dmlChannels.RemoveProducerChannels(chanNames...)
|
||||
// it's ok just to leave create collection message sent, datanode and querynode does't process CreateCollection logic
|
||||
return fmt.Errorf("meta table add collection failed,error = %w", err)
|
||||
}
|
||||
|
||||
t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
|
||||
t.core.SendTimeTick(ts, reason)
|
||||
|
@ -397,6 +404,7 @@ func (t *DescribeCollectionReqTask) Execute(ctx context.Context) error {
|
|||
createdPhysicalTime, _ := tsoutil.ParseHybridTs(collInfo.CreateTime)
|
||||
t.Rsp.CreatedUtcTimestamp = createdPhysicalTime
|
||||
t.Rsp.Aliases = t.core.MetaTable.ListAliases(collInfo.ID)
|
||||
t.Rsp.StartPositions = collInfo.GetStartPositions()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue