Resend unsuccessful ddMsg when master start (#5214)

Resend unsuccessful ddMsg when master start

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/5236/head
Cai Yudong 2021-05-14 21:26:06 +08:00 committed by GitHub
parent 9ffb42d448
commit 08bb1b2ec3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 428 additions and 273 deletions

View File

@ -94,28 +94,28 @@ func TestGrpcService(t *testing.T) {
return nil
}
createCollectionArray := make([]*internalpb.CreateCollectionRequest, 0, 16)
core.DdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest) error {
core.SendDdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest) error {
t.Logf("Create Colllection %s", req.CollectionName)
createCollectionArray = append(createCollectionArray, req)
return nil
}
dropCollectionArray := make([]*internalpb.DropCollectionRequest, 0, 16)
core.DdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest) error {
core.SendDdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest) error {
t.Logf("Drop Collection %s", req.CollectionName)
dropCollectionArray = append(dropCollectionArray, req)
return nil
}
createPartitionArray := make([]*internalpb.CreatePartitionRequest, 0, 16)
core.DdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest) error {
core.SendDdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest) error {
t.Logf("Create Partition %s", req.PartitionName)
createPartitionArray = append(createPartitionArray, req)
return nil
}
dropPartitionArray := make([]*internalpb.DropPartitionRequest, 0, 16)
core.DdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest) error {
core.SendDdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest) error {
t.Logf("Drop Partition %s", req.PartitionName)
dropPartitionArray = append(dropPartitionArray, req)
return nil

View File

@ -23,6 +23,7 @@ import (
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/allocator"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
@ -52,6 +53,13 @@ import (
// ------------------ struct -----------------------
// DdOperation used to save ddMsg into ETCD
type DdOperation struct {
Body string `json:"body"`
Body1 string `json:"body1"` // used for CreateCollectionReq only
Type string `json:"type"`
}
// master core
type Core struct {
/*
@ -89,16 +97,16 @@ type Core struct {
SendTimeTick func(t typeutil.Timestamp) error
//setMsgStreams, send create collection into dd channel
DdCreateCollectionReq func(ctx context.Context, req *internalpb.CreateCollectionRequest) error
SendDdCreateCollectionReq func(ctx context.Context, req *internalpb.CreateCollectionRequest) error
//setMsgStreams, send drop collection into dd channel, and notify the proxy to delete this collection
DdDropCollectionReq func(ctx context.Context, req *internalpb.DropCollectionRequest) error
SendDdDropCollectionReq func(ctx context.Context, req *internalpb.DropCollectionRequest) error
//setMsgStreams, send create partition into dd channel
DdCreatePartitionReq func(ctx context.Context, req *internalpb.CreatePartitionRequest) error
SendDdCreatePartitionReq func(ctx context.Context, req *internalpb.CreatePartitionRequest) error
//setMsgStreams, send drop partition into dd channel
DdDropPartitionReq func(ctx context.Context, req *internalpb.DropPartitionRequest) error
SendDdDropPartitionReq func(ctx context.Context, req *internalpb.DropPartitionRequest) error
//setMsgStreams segment channel, receive segment info from data service, if master create segment
DataServiceSegmentChan chan *datapb.SegmentInfo
@ -196,17 +204,17 @@ func (c *Core) checkInit() error {
if c.ddReqQueue == nil {
return fmt.Errorf("ddReqQueue is nil")
}
if c.DdCreateCollectionReq == nil {
return fmt.Errorf("DdCreateCollectionReq is nil")
if c.SendDdCreateCollectionReq == nil {
return fmt.Errorf("SendDdCreateCollectionReq is nil")
}
if c.DdDropCollectionReq == nil {
return fmt.Errorf("DdDropCollectionReq is nil")
if c.SendDdDropCollectionReq == nil {
return fmt.Errorf("SendDdDropCollectionReq is nil")
}
if c.DdCreatePartitionReq == nil {
return fmt.Errorf("DdCreatePartitionReq is nil")
if c.SendDdCreatePartitionReq == nil {
return fmt.Errorf("SendDdCreatePartitionReq is nil")
}
if c.DdDropPartitionReq == nil {
return fmt.Errorf("DdDropPartitionReq is nil")
if c.SendDdDropPartitionReq == nil {
return fmt.Errorf("SendDdDropPartitionReq is nil")
}
if c.DataServiceSegmentChan == nil {
return fmt.Errorf("DataServiceSegmentChan is nil")
@ -403,6 +411,24 @@ func (c *Core) tsLoop() {
}
}
}
func (c *Core) setDdMsgSendFlag(b bool) error {
flag, err := c.MetaTable.client.Load(DDMsgSendPrefix)
if err != nil {
return err
}
if (b && flag == "true") || (!b && flag == "false") {
log.Debug("DdMsg send flag need not change", zap.String("flag", flag))
return nil
}
if b {
return c.MetaTable.client.Save(DDMsgSendPrefix, "true")
}
return c.MetaTable.client.Save(DDMsgSendPrefix, "false")
}
func (c *Core) setMsgStreams() error {
if Params.PulsarAddress == "" {
return fmt.Errorf("PulsarAddress is empty")
@ -476,7 +502,7 @@ func (c *Core) setMsgStreams() error {
return nil
}
c.DdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest) error {
c.SendDdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest) error {
msgPack := ms.MsgPack{}
baseMsg := ms.BaseMsg{
Ctx: ctx,
@ -495,7 +521,7 @@ func (c *Core) setMsgStreams() error {
return nil
}
c.DdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest) error {
c.SendDdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest) error {
msgPack := ms.MsgPack{}
baseMsg := ms.BaseMsg{
Ctx: ctx,
@ -514,7 +540,7 @@ func (c *Core) setMsgStreams() error {
return nil
}
c.DdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest) error {
c.SendDdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest) error {
msgPack := ms.MsgPack{}
baseMsg := ms.BaseMsg{
Ctx: ctx,
@ -533,7 +559,7 @@ func (c *Core) setMsgStreams() error {
return nil
}
c.DdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest) error {
c.SendDdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest) error {
msgPack := ms.MsgPack{}
baseMsg := ms.BaseMsg{
Ctx: ctx,
@ -841,6 +867,72 @@ func (c *Core) Init() error {
return initError
}
func (c *Core) reSendDdMsg(ctx context.Context) error {
flag, err := c.MetaTable.client.Load(DDMsgSendPrefix)
if err != nil || flag == "true" {
log.Debug("No un-successful DdMsg")
return nil
}
ddOpStr, err := c.MetaTable.client.Load(DDOperationPrefix)
if err != nil {
log.Debug("DdOperation key does not exist")
return nil
}
var ddOp DdOperation
if err = json.Unmarshal([]byte(ddOpStr), &ddOp); err != nil {
return err
}
switch ddOp.Type {
case CreateCollectionDDType:
var ddCollReq = internalpb.CreateCollectionRequest{}
if err = proto.UnmarshalText(ddOp.Body, &ddCollReq); err != nil {
return err
}
// TODO: can optimize
var ddPartReq = internalpb.CreatePartitionRequest{}
if err = proto.UnmarshalText(ddOp.Body1, &ddPartReq); err != nil {
return err
}
if err = c.SendDdCreateCollectionReq(ctx, &ddCollReq); err != nil {
return err
}
if err = c.SendDdCreatePartitionReq(ctx, &ddPartReq); err != nil {
return err
}
case DropCollectionDDType:
var ddReq = internalpb.DropCollectionRequest{}
if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil {
return err
}
if err = c.SendDdDropCollectionReq(ctx, &ddReq); err != nil {
return err
}
case CreatePartitionDDType:
var ddReq = internalpb.CreatePartitionRequest{}
if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil {
return err
}
if err = c.SendDdCreatePartitionReq(ctx, &ddReq); err != nil {
return err
}
case DropPartitionDDType:
var ddReq = internalpb.DropPartitionRequest{}
if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil {
return err
}
if err = c.SendDdDropPartitionReq(ctx, &ddReq); err != nil {
return err
}
default:
return fmt.Errorf("Invalid DdOperation %s", ddOp.Type)
}
// Update DDOperation in etcd
return c.setDdMsgSendFlag(true)
}
func (c *Core) Start() error {
if err := c.checkInit(); err != nil {
return err
@ -851,6 +943,9 @@ func (c *Core) Start() error {
log.Debug("master", zap.String("time tick channel name", Params.TimeTickChannel))
c.startOnce.Do(func() {
if err := c.reSendDdMsg(c.ctx); err != nil {
return
}
go c.startDdScheduler()
go c.startTimeTickLoop()
go c.startDataServiceSegmentLoop()

View File

@ -25,7 +25,6 @@ import (
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/masterpb"
@ -406,25 +405,27 @@ func TestMasterService(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, createMsg.CollectionID, createMeta.ID)
// check DDMsg type and info
msgType, err := core.MetaTable.client.Load(DDMsgTypePrefix)
// check DD operation info
flag, err := core.MetaTable.client.Load(DDMsgSendPrefix)
assert.Nil(t, err)
assert.Equal(t, CreateCollectionMsgType, msgType)
assert.Equal(t, "true", flag)
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix)
assert.Nil(t, err)
var ddOp DdOperation
err = json.Unmarshal([]byte(ddOpStr), &ddOp)
assert.Nil(t, err)
assert.Equal(t, CreateCollectionDDType, ddOp.Type)
var meta map[string]string
metaStr, err := core.MetaTable.client.Load(DDMsgPrefix)
assert.Nil(t, err)
err = json.Unmarshal([]byte(metaStr), &meta)
var ddCollReq = internalpb.CreateCollectionRequest{}
err = proto.UnmarshalText(ddOp.Body, &ddCollReq)
assert.Nil(t, err)
assert.Equal(t, createMeta.ID, ddCollReq.CollectionID)
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, createMeta.ID)
v1 := meta[k1]
var collInfo etcdpb.CollectionInfo
err = proto.UnmarshalText(v1, &collInfo)
var ddPartReq = internalpb.CreatePartitionRequest{}
err = proto.UnmarshalText(ddOp.Body1, &ddPartReq)
assert.Nil(t, err)
assert.Equal(t, createMeta.ID, collInfo.ID)
assert.Equal(t, createMeta.CreateTime, collInfo.CreateTime)
assert.Equal(t, createMeta.PartitionIDs[0], collInfo.PartitionIDs[0])
assert.Equal(t, createMeta.ID, ddPartReq.CollectionID)
assert.Equal(t, createMeta.PartitionIDs[0], ddPartReq.PartitionID)
})
t.Run("has collection", func(t *testing.T) {
@ -547,33 +548,22 @@ func TestMasterService(t *testing.T) {
assert.Equal(t, 1, len(pm.GetCollArray()))
assert.Equal(t, "testColl", pm.GetCollArray()[0])
// check DDMsg type and info
msgType, err := core.MetaTable.client.Load(DDMsgTypePrefix)
// check DD operation info
flag, err := core.MetaTable.client.Load(DDMsgSendPrefix)
assert.Nil(t, err)
assert.Equal(t, CreatePartitionMsgType, msgType)
assert.Equal(t, "true", flag)
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix)
assert.Nil(t, err)
var ddOp DdOperation
err = json.Unmarshal([]byte(ddOpStr), &ddOp)
assert.Nil(t, err)
assert.Equal(t, CreatePartitionDDType, ddOp.Type)
var meta map[string]string
metaStr, err := core.MetaTable.client.Load(DDMsgPrefix)
var ddReq = internalpb.CreatePartitionRequest{}
err = proto.UnmarshalText(ddOp.Body, &ddReq)
assert.Nil(t, err)
err = json.Unmarshal([]byte(metaStr), &meta)
assert.Nil(t, err)
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collMeta.ID)
v1 := meta[k1]
var collInfo etcdpb.CollectionInfo
err = proto.UnmarshalText(v1, &collInfo)
assert.Nil(t, err)
assert.Equal(t, collMeta.ID, collInfo.ID)
assert.Equal(t, collMeta.CreateTime, collInfo.CreateTime)
assert.Equal(t, collMeta.PartitionIDs[0], collInfo.PartitionIDs[0])
k2 := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, collMeta.ID, partMeta.PartitionID)
v2 := meta[k2]
var partInfo etcdpb.PartitionInfo
err = proto.UnmarshalText(v2, &partInfo)
assert.Nil(t, err)
assert.Equal(t, partMeta.PartitionName, partInfo.PartitionName)
assert.Equal(t, partMeta.PartitionID, partInfo.PartitionID)
assert.Equal(t, collMeta.ID, ddReq.CollectionID)
assert.Equal(t, partMeta.PartitionID, ddReq.PartitionID)
})
t.Run("has partition", func(t *testing.T) {
@ -964,25 +954,22 @@ func TestMasterService(t *testing.T) {
assert.Equal(t, 2, len(pm.GetCollArray()))
assert.Equal(t, "testColl", pm.GetCollArray()[1])
// check DDMsg type and info
msgType, err := core.MetaTable.client.Load(DDMsgTypePrefix)
// check DD operation info
flag, err := core.MetaTable.client.Load(DDMsgSendPrefix)
assert.Nil(t, err)
assert.Equal(t, DropPartitionMsgType, msgType)
assert.Equal(t, "true", flag)
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix)
assert.Nil(t, err)
var ddOp DdOperation
err = json.Unmarshal([]byte(ddOpStr), &ddOp)
assert.Nil(t, err)
assert.Equal(t, DropPartitionDDType, ddOp.Type)
var meta map[string]string
metaStr, err := core.MetaTable.client.Load(DDMsgPrefix)
var ddReq = internalpb.DropPartitionRequest{}
err = proto.UnmarshalText(ddOp.Body, &ddReq)
assert.Nil(t, err)
err = json.Unmarshal([]byte(metaStr), &meta)
assert.Nil(t, err)
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collMeta.ID)
v1 := meta[k1]
var collInfo etcdpb.CollectionInfo
err = proto.UnmarshalText(v1, &collInfo)
assert.Nil(t, err)
assert.Equal(t, collMeta.ID, collInfo.ID)
assert.Equal(t, collMeta.CreateTime, collInfo.CreateTime)
assert.Equal(t, collMeta.PartitionIDs[0], collInfo.PartitionIDs[0])
assert.Equal(t, collMeta.ID, ddReq.CollectionID)
assert.Equal(t, dropPartID, ddReq.PartitionID)
})
t.Run("drop collection", func(t *testing.T) {
@ -1037,17 +1024,21 @@ func TestMasterService(t *testing.T) {
assert.Equal(t, len(collArray), 3)
assert.Equal(t, collArray[2], "testColl")
// check DDMsg type and info
msgType, err := core.MetaTable.client.Load(DDMsgTypePrefix)
// check DD operation info
flag, err := core.MetaTable.client.Load(DDMsgSendPrefix)
assert.Nil(t, err)
assert.Equal(t, DropCollectionMsgType, msgType)
assert.Equal(t, "true", flag)
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix)
assert.Nil(t, err)
var ddOp DdOperation
err = json.Unmarshal([]byte(ddOpStr), &ddOp)
assert.Nil(t, err)
assert.Equal(t, DropCollectionDDType, ddOp.Type)
var collID typeutil.UniqueID
collIDByte, err := core.MetaTable.client.Load(DDMsgPrefix)
var ddReq = internalpb.DropCollectionRequest{}
err = proto.UnmarshalText(ddOp.Body, &ddReq)
assert.Nil(t, err)
err = json.Unmarshal([]byte(collIDByte), &collID)
assert.Nil(t, err)
assert.Equal(t, collMeta.ID, collID)
assert.Equal(t, collMeta.ID, ddReq.CollectionID)
})
t.Run("context_cancel", func(t *testing.T) {
@ -1665,25 +1656,25 @@ func TestCheckInit(t *testing.T) {
err = c.checkInit()
assert.NotNil(t, err)
c.DdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest) error {
c.SendDdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest) error {
return nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.DdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest) error {
c.SendDdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest) error {
return nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.DdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest) error {
c.SendDdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest) error {
return nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.DdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest) error {
c.SendDdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest) error {
return nil
}
err = c.checkInit()

View File

@ -12,7 +12,7 @@
package masterservice
import (
"encoding/json"
"errors"
"fmt"
"path"
"strconv"
@ -21,16 +21,13 @@ import (
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"errors"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const (
@ -42,14 +39,13 @@ const (
SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index"
IndexMetaPrefix = ComponentPrefix + "/index"
DDMsgPrefix = ComponentPrefix + "/dd-msg"
DDMsgTypePrefix = ComponentPrefix + "/dd-msg-type"
DDMsgFlagPrefix = ComponentPrefix + "/dd-msg-flag"
DDOperationPrefix = ComponentPrefix + "/dd-operation"
DDMsgSendPrefix = ComponentPrefix + "/dd-msg-send"
CreateCollectionMsgType = "CreateCollection"
DropCollectionMsgType = "DropCollection"
CreatePartitionMsgType = "CreatePartition"
DropPartitionMsgType = "DropPartition"
CreateCollectionDDType = "CreateCollection"
DropCollectionDDType = "DropCollection"
CreatePartitionDDType = "CreatePartition"
DropPartitionDDType = "DropPartition"
)
type metaTable struct {
@ -232,7 +228,7 @@ func (mt *metaTable) AddProxy(po *pb.ProxyMeta) error {
return nil
}
func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo) error {
func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo, ddOpStr string) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -270,16 +266,11 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionIn
meta[k] = v
}
// record ddmsg info and type
ddmsg, err := json.Marshal(meta)
if err != nil {
return err
}
meta[DDMsgPrefix] = string(ddmsg)
meta[DDMsgTypePrefix] = CreateCollectionMsgType
meta[DDMsgFlagPrefix] = "false"
// save ddOpStr into etcd
meta[DDOperationPrefix] = ddOpStr
meta[DDMsgSendPrefix] = "false"
err = mt.client.MultiSave(meta)
err := mt.client.MultiSave(meta)
if err != nil {
_ = mt.reloadFromKV()
return err
@ -288,7 +279,7 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionIn
return nil
}
func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID) error {
func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr string) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -333,18 +324,13 @@ func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID) error {
fmt.Sprintf("%s/%d", IndexMetaPrefix, collID),
}
// record ddmsg info and type
ddmsg, err := json.Marshal(collID)
if err != nil {
return err
}
saveMeta := map[string]string{
DDMsgPrefix: string(ddmsg),
DDMsgTypePrefix: DropCollectionMsgType,
DDMsgFlagPrefix: "false",
// save ddOpStr into etcd
var saveMeta = map[string]string{
DDOperationPrefix: ddOpStr,
DDMsgSendPrefix: "false",
}
err = mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys)
err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys)
if err != nil {
_ = mt.reloadFromKV()
return err
@ -416,7 +402,7 @@ func (mt *metaTable) ListCollections() ([]string, error) {
return colls, nil
}
func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID) error {
func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ddOpStr string) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
coll, ok := mt.collID2Meta[collID]
@ -457,16 +443,11 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string
v2 := proto.MarshalTextString(&partMeta)
meta := map[string]string{k1: v1, k2: v2}
// record ddmsg info and type
ddmsg, err := json.Marshal(meta)
if err != nil {
return err
}
meta[DDMsgPrefix] = string(ddmsg)
meta[DDMsgTypePrefix] = CreatePartitionMsgType
meta[DDMsgFlagPrefix] = "false"
// save ddOpStr into etcd
meta[DDOperationPrefix] = ddOpStr
meta[DDMsgSendPrefix] = "false"
err = mt.client.MultiSave(meta)
err := mt.client.MultiSave(meta)
if err != nil {
_ = mt.reloadFromKV()
return err
@ -474,25 +455,34 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string
return nil
}
func (mt *metaTable) getPartitionByName(collID typeutil.UniqueID, partitionName string) (pb.PartitionInfo, error) {
collMeta, ok := mt.collID2Meta[collID]
if !ok {
return pb.PartitionInfo{}, fmt.Errorf("can't find collection id = %d", collID)
}
for _, id := range collMeta.PartitionIDs {
partMeta, ok := mt.partitionID2Meta[id]
if ok && partMeta.PartitionName == partitionName {
return partMeta, nil
}
}
return pb.PartitionInfo{}, fmt.Errorf("partition %s does not exist", partitionName)
}
func (mt *metaTable) GetPartitionByName(collID typeutil.UniqueID, partitionName string) (pb.PartitionInfo, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
return mt.getPartitionByName(collID, partitionName)
}
func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string) bool {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
col, ok := mt.collID2Meta[collID]
if !ok {
return false
}
for _, partitionID := range col.PartitionIDs {
meta, ok := mt.partitionID2Meta[partitionID]
if ok {
if meta.PartitionName == partitionName {
return true
}
}
}
return false
_, err := mt.getPartitionByName(collID, partitionName)
return err == nil
}
func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string) (typeutil.UniqueID, error) {
func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ddOpStr string) (typeutil.UniqueID, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -540,7 +530,7 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
}
delete(mt.segID2IndexMeta, segID)
}
collKV := map[string]string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)): proto.MarshalTextString(&collMeta)}
meta := map[string]string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)): proto.MarshalTextString(&collMeta)}
delMetaKeys := []string{
fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, collMeta.ID, partMeta.PartitionID),
}
@ -549,16 +539,11 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
delMetaKeys = append(delMetaKeys, k)
}
// record ddmsg info and type
ddmsg, err := json.Marshal(collKV)
if err != nil {
return 0, err
}
collKV[DDMsgPrefix] = string(ddmsg)
collKV[DDMsgTypePrefix] = DropPartitionMsgType
collKV[DDMsgFlagPrefix] = "false"
// save ddOpStr into etcd
meta[DDOperationPrefix] = ddOpStr
meta[DDMsgSendPrefix] = "false"
err = mt.client.MultiSaveAndRemoveWithPrefix(collKV, delMetaKeys)
err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys)
if err != nil {
_ = mt.reloadFromKV()
return 0, err

View File

@ -147,6 +147,15 @@ func Test_MockKV(t *testing.T) {
}
func TestMetaTable(t *testing.T) {
const collID = typeutil.UniqueID(1)
const collIDInvalid = typeutil.UniqueID(2)
const partIDDefault = typeutil.UniqueID(10)
const partID = typeutil.UniqueID(20)
const partIDInvalid = typeutil.UniqueID(21)
const segID = typeutil.UniqueID(100)
const segID2 = typeutil.UniqueID(101)
const fieldID = typeutil.UniqueID(110)
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
@ -161,14 +170,14 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo := &pb.CollectionInfo{
ID: 1,
ID: collID,
Schema: &schemapb.CollectionSchema{
Name: "testColl",
Description: "",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
FieldID: 110,
FieldID: fieldID,
Name: "field110",
IsPrimaryKey: false,
Description: "",
@ -198,16 +207,21 @@ func TestMetaTable(t *testing.T) {
},
FieldIndexes: []*pb.FieldIndexInfo{
{
FiledID: 110,
FiledID: fieldID,
IndexID: 10000,
},
},
CreateTime: 0,
PartitionIDs: nil,
}
partInfoDefault := &pb.PartitionInfo{
PartitionName: "_default",
PartitionID: partIDDefault,
SegmentIDs: nil,
}
partInfo := &pb.PartitionInfo{
PartitionName: "testPart",
PartitionID: 10,
PartitionID: partID,
SegmentIDs: nil,
}
idxInfo := []*pb.IndexInfo{
@ -228,55 +242,69 @@ func TestMetaTable(t *testing.T) {
}
t.Run("add collection", func(t *testing.T) {
partInfo.SegmentIDs = []int64{100}
err = mt.AddCollection(collInfo, partInfo, idxInfo)
partInfoDefault.SegmentIDs = []int64{segID}
err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "")
assert.NotNil(t, err)
partInfo.SegmentIDs = []int64{}
partInfoDefault.SegmentIDs = []int64{}
collInfo.PartitionIDs = []int64{100}
err = mt.AddCollection(collInfo, partInfo, idxInfo)
collInfo.PartitionIDs = []int64{segID}
err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "")
assert.NotNil(t, err)
collInfo.PartitionIDs = []int64{}
err = mt.AddCollection(collInfo, partInfo, nil)
err = mt.AddCollection(collInfo, partInfoDefault, nil, "")
assert.NotNil(t, err)
err = mt.AddCollection(collInfo, partInfo, idxInfo)
err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "")
assert.Nil(t, err)
collMeta, err := mt.GetCollectionByName("testColl")
assert.Nil(t, err)
assert.Equal(t, collMeta.PartitionIDs[0], int64(10))
assert.Equal(t, collMeta.PartitionIDs[0], partIDDefault)
assert.Equal(t, len(collMeta.PartitionIDs), 1)
assert.True(t, mt.HasCollection(collInfo.ID))
field, err := mt.GetFieldSchema("testColl", "field110")
assert.Nil(t, err)
assert.Equal(t, field.FieldID, collInfo.Schema.Fields[0].FieldID)
// check DD operation flag
flag, err := mt.client.Load(DDMsgSendPrefix)
assert.Nil(t, err)
assert.Equal(t, "false", flag)
})
t.Run("add partition", func(t *testing.T) {
assert.Nil(t, mt.AddPartition(collID, partInfo.PartitionName, partInfo.PartitionID, ""))
// check DD operation flag
flag, err := mt.client.Load(DDMsgSendPrefix)
assert.Nil(t, err)
assert.Equal(t, "false", flag)
})
t.Run("add segment", func(t *testing.T) {
seg := &datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
PartitionID: 10,
ID: segID,
CollectionID: collID,
PartitionID: partID,
}
assert.Nil(t, mt.AddSegment(seg))
assert.NotNil(t, mt.AddSegment(seg))
seg.ID = 101
seg.CollectionID = 2
seg.ID = segID2
seg.CollectionID = collIDInvalid
assert.NotNil(t, mt.AddSegment(seg))
seg.CollectionID = 1
seg.PartitionID = 11
seg.CollectionID = collID
seg.PartitionID = partIDInvalid
assert.NotNil(t, mt.AddSegment(seg))
seg.PartitionID = 10
seg.PartitionID = partID
assert.Nil(t, mt.AddSegment(seg))
})
t.Run("add segment index", func(t *testing.T) {
seg := pb.SegmentIndexInfo{
SegmentID: 100,
FieldID: 110,
SegmentID: segID,
FieldID: fieldID,
IndexID: 10000,
BuildID: 201,
}
@ -318,7 +346,7 @@ func TestMetaTable(t *testing.T) {
seg, field, err := mt.GetNotIndexedSegments("testColl", "field110", idxInfo)
assert.Nil(t, err)
assert.Equal(t, len(seg), 1)
assert.Equal(t, seg[0], int64(101))
assert.Equal(t, seg[0], segID2)
assert.True(t, EqualKeyPairArray(field.TypeParams, tparams))
params = []*commonpb.KeyValuePair{
@ -334,8 +362,8 @@ func TestMetaTable(t *testing.T) {
seg, field, err = mt.GetNotIndexedSegments("testColl", "field110", idxInfo)
assert.Nil(t, err)
assert.Equal(t, len(seg), 2)
assert.Equal(t, seg[0], int64(100))
assert.Equal(t, seg[1], int64(101))
assert.Equal(t, seg[0], segID)
assert.Equal(t, seg[1], segID2)
assert.True(t, EqualKeyPairArray(field.TypeParams, tparams))
})
@ -397,16 +425,31 @@ func TestMetaTable(t *testing.T) {
assert.Equal(t, len(idxs), 1)
assert.Equal(t, idxs[0].IndexID, int64(2001))
_, err = mt.GetSegmentIndexInfoByID(100, -1, "")
_, err = mt.GetSegmentIndexInfoByID(segID, -1, "")
assert.NotNil(t, err)
})
t.Run("drop partition", func(t *testing.T) {
id, err := mt.DeletePartition(collID, partInfo.PartitionName, "")
assert.Nil(t, err)
assert.Equal(t, partID, id)
// check DD operation flag
flag, err := mt.client.Load(DDMsgSendPrefix)
assert.Nil(t, err)
assert.Equal(t, "false", flag)
})
t.Run("drop collection", func(t *testing.T) {
err := mt.DeleteCollection(2)
err := mt.DeleteCollection(collIDInvalid, "")
assert.NotNil(t, err)
err = mt.DeleteCollection(1)
err = mt.DeleteCollection(collID, "")
assert.Nil(t, err)
// check DD operation flag
flag, err := mt.client.Load(DDMsgSendPrefix)
assert.Nil(t, err)
assert.Equal(t, "false", flag)
})
/////////////////////////// these tests should run at last, it only used to hit the error lines ////////////////////////
@ -421,7 +464,7 @@ func TestMetaTable(t *testing.T) {
return fmt.Errorf("multi save error")
}
collInfo.PartitionIDs = nil
err := mt.AddCollection(collInfo, partInfo, idxInfo)
err := mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.NotNil(t, err)
assert.EqualError(t, err, "multi save error")
})
@ -434,11 +477,11 @@ func TestMetaTable(t *testing.T) {
return fmt.Errorf("milti save and remove with prefix error")
}
collInfo.PartitionIDs = nil
err := mt.AddCollection(collInfo, partInfo, idxInfo)
err := mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
mt.partitionID2Meta = make(map[typeutil.UniqueID]pb.PartitionInfo)
mt.indexID2Meta = make(map[int64]pb.IndexInfo)
err = mt.DeleteCollection(collInfo.ID)
err = mt.DeleteCollection(collInfo.ID, "")
assert.NotNil(t, err)
assert.EqualError(t, err, "milti save and remove with prefix error")
})
@ -449,13 +492,13 @@ func TestMetaTable(t *testing.T) {
}
collInfo.PartitionIDs = nil
err := mt.AddCollection(collInfo, partInfo, idxInfo)
err := mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
seg := &datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
PartitionID: 10,
CollectionID: collID,
PartitionID: partID,
}
assert.Nil(t, mt.AddSegment(seg))
@ -485,17 +528,17 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo)
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
err = mt.AddPartition(2, "no-part", 22)
err = mt.AddPartition(2, "no-part", 22, "")
assert.NotNil(t, err)
assert.EqualError(t, err, "can't find collection. id = 2")
coll := mt.collID2Meta[collInfo.ID]
coll.PartitionIDs = make([]int64, Params.MaxPartitionNum)
mt.collID2Meta[coll.ID] = coll
err = mt.AddPartition(coll.ID, "no-part", 22)
err = mt.AddPartition(coll.ID, "no-part", 22, "")
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("maximum partition's number should be limit to %d", Params.MaxPartitionNum))
@ -505,7 +548,7 @@ func TestMetaTable(t *testing.T) {
mockKV.multiSave = func(kvs map[string]string) error {
return fmt.Errorf("multi save error")
}
err = mt.AddPartition(coll.ID, "no-part", 22)
err = mt.AddPartition(coll.ID, "no-part", 22, "")
assert.NotNil(t, err)
assert.EqualError(t, err, "multi save error")
@ -513,13 +556,13 @@ func TestMetaTable(t *testing.T) {
return nil
}
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo)
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
err = mt.AddPartition(coll.ID, partInfo.PartitionName, 22)
err = mt.AddPartition(coll.ID, partInfo.PartitionName, 22, "")
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("partition name = %s already exists", partInfo.PartitionName))
err = mt.AddPartition(coll.ID, "no-part", partInfo.PartitionID)
err = mt.AddPartition(coll.ID, "no-part", partInfo.PartitionID, "")
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("partition id = %d already exists", partInfo.PartitionID))
})
@ -535,7 +578,7 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo)
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
mt.partitionID2Meta = make(map[int64]pb.PartitionInfo)
@ -556,14 +599,14 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo)
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
_, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName)
_, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName, "")
assert.NotNil(t, err)
assert.EqualError(t, err, "default partition cannot be deleted")
_, err = mt.DeletePartition(collInfo.ID, "abc")
_, err = mt.DeletePartition(collInfo.ID, "abc", "")
assert.NotNil(t, err)
assert.EqualError(t, err, "partition abc does not exist")
@ -573,12 +616,12 @@ func TestMetaTable(t *testing.T) {
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string) error {
return fmt.Errorf("multi save and remove with prefix error")
}
_, err = mt.DeletePartition(collInfo.ID, pm.PartitionName)
_, err = mt.DeletePartition(collInfo.ID, pm.PartitionName, "")
assert.NotNil(t, err)
assert.EqualError(t, err, "multi save and remove with prefix error")
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
_, err = mt.DeletePartition(collInfo.ID, "abc")
_, err = mt.DeletePartition(collInfo.ID, "abc", "")
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("can't find collection id = %d", collInfo.ID))
@ -598,7 +641,7 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo)
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
noPart := pb.PartitionInfo{
@ -644,13 +687,13 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo)
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
seg := &datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
PartitionID: 10,
CollectionID: collID,
PartitionID: partID,
}
assert.Nil(t, mt.AddSegment(seg))
@ -683,7 +726,7 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo)
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
assert.Nil(t, mt.AddSegment(seg))
@ -710,7 +753,7 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo)
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
_, _, err = mt.DropIndex("abc", "abc", "abc")
@ -747,7 +790,7 @@ func TestMetaTable(t *testing.T) {
err = mt.reloadFromKV()
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo)
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
mt.partitionID2Meta = make(map[int64]pb.PartitionInfo)
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string) error {
@ -772,7 +815,7 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo)
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
_, err = mt.GetSegmentIndexInfoByID(101, 101, "abc")
@ -793,8 +836,8 @@ func TestMetaTable(t *testing.T) {
segInfo := &datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
PartitionID: 10,
CollectionID: collID,
PartitionID: partID,
}
assert.Nil(t, mt.AddSegment(segInfo))
segIdx := &pb.SegmentIndexInfo{
@ -831,7 +874,7 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo)
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
@ -902,7 +945,7 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo)
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, "no-field", idx)
@ -927,7 +970,7 @@ func TestMetaTable(t *testing.T) {
return nil
}
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo)
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
coll := mt.collID2Meta[collInfo.ID]
coll.FieldIndexes = append(coll.FieldIndexes, &pb.FieldIndexInfo{FiledID: coll.FieldIndexes[0].FiledID, IndexID: coll.FieldIndexes[0].IndexID + 1})
@ -975,7 +1018,7 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo)
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
mt.indexID2Meta = make(map[int64]pb.IndexInfo)
_, _, err = mt.GetIndexByName(collInfo.Schema.Name, idxInfo[0].IndexName)

View File

@ -122,11 +122,11 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
partitionID, _, err := t.core.idAllocator(1)
partID, _, err := t.core.idAllocator(1)
if err != nil {
return err
}
collMeta := etcdpb.CollectionInfo{
collInfo := etcdpb.CollectionInfo{
ID: collID,
Schema: &schema,
CreateTime: collTs,
@ -135,9 +135,9 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
}
// every collection has _default partition
partMeta := etcdpb.PartitionInfo{
partInfo := etcdpb.PartitionInfo{
PartitionName: Params.DefaultPartitionName,
PartitionID: partitionID,
PartitionID: partID,
SegmentIDs: make([]typeutil.UniqueID, 0, 16),
}
idxInfo := make([]*etcdpb.IndexInfo, 0, 16)
@ -164,11 +164,6 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
// }
//}
err = t.core.MetaTable.AddCollection(&collMeta, &partMeta, idxInfo)
if err != nil {
return err
}
// schema is modified (add RowIDField and TimestampField),
// so need Marshal again
schemaBytes, err := proto.Marshal(&schema)
@ -176,7 +171,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
return err
}
ddReq := internalpb.CreateCollectionRequest{
ddCollReq := internalpb.CreateCollectionRequest{
Base: t.Req.Base,
DbName: t.Req.DbName,
CollectionName: t.Req.CollectionName,
@ -185,12 +180,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
Schema: schemaBytes,
}
err = t.core.DdCreateCollectionReq(ctx, &ddReq)
if err != nil {
return err
}
ddPart := internalpb.CreatePartitionRequest{
ddPartReq := internalpb.CreatePartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreatePartition,
MsgID: t.Req.Base.MsgID, //TODO, msg id
@ -201,19 +191,33 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
CollectionName: t.Req.CollectionName,
PartitionName: Params.DefaultPartitionName,
DbID: 0, //TODO, not used
CollectionID: collMeta.ID,
PartitionID: partMeta.PartitionID,
CollectionID: collInfo.ID,
PartitionID: partInfo.PartitionID,
}
err = t.core.DdCreatePartitionReq(ctx, &ddPart)
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOpStr, err := EncodeDdOperation(&ddCollReq, &ddPartReq, CreateCollectionDDType)
if err != nil {
return err
}
// Marking DDMsgFlagPrefix to true means ddMsg has been send successfully
t.core.MetaTable.client.Save(DDMsgFlagPrefix, "true")
err = t.core.MetaTable.AddCollection(&collInfo, &partInfo, idxInfo, ddOpStr)
if err != nil {
return err
}
return nil
err = t.core.SendDdCreateCollectionReq(ctx, &ddCollReq)
if err != nil {
return err
}
err = t.core.SendDdCreatePartitionReq(ctx, &ddPartReq)
if err != nil {
return err
}
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
}
type DropCollectionReqTask struct {
@ -246,16 +250,6 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
if err = t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName); err != nil {
return err
}
err = t.core.MetaTable.DeleteCollection(collMeta.ID)
if err != nil {
return err
}
//data service should drop segments , which belong to this collection, from the segment manager
ddReq := internalpb.DropCollectionRequest{
Base: t.Req.Base,
@ -265,7 +259,19 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
CollectionID: collMeta.ID,
}
err = t.core.DdDropCollectionReq(ctx, &ddReq)
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOpStr, err := EncodeDdOperation(&ddReq, nil, DropCollectionDDType)
if err != nil {
return err
}
err = t.core.MetaTable.DeleteCollection(collMeta.ID, ddOpStr)
if err != nil {
return err
}
err = t.core.SendDdDropCollectionReq(ctx, &ddReq)
if err != nil {
return err
}
@ -277,10 +283,11 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
}
}()
// Marking DDMsgFlagPrefix to true means ddMsg has been send successfully
t.core.MetaTable.client.Save(DDMsgFlagPrefix, "true")
// error doesn't matter here
t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
return nil
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
}
type HasCollectionReqTask struct {
@ -434,11 +441,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
partitionID, _, err := t.core.idAllocator(1)
if err != nil {
return err
}
err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partitionID)
partID, _, err := t.core.idAllocator(1)
if err != nil {
return err
}
@ -450,21 +453,31 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
PartitionName: t.Req.PartitionName,
DbID: 0, // todo, not used
CollectionID: collMeta.ID,
PartitionID: partitionID,
PartitionID: partID,
}
err = t.core.DdCreatePartitionReq(ctx, &ddReq)
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOpStr, err := EncodeDdOperation(&ddReq, nil, CreatePartitionDDType)
if err != nil {
return err
}
err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ddOpStr)
if err != nil {
return err
}
err = t.core.SendDdCreatePartitionReq(ctx, &ddReq)
if err != nil {
return err
}
// error doesn't matter here
_ = t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
// Marking DDMsgFlagPrefix to true means ddMsg has been send successfully
t.core.MetaTable.client.Save(DDMsgFlagPrefix, "true")
return nil
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
}
type DropPartitionReqTask struct {
@ -492,11 +505,11 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DropPartition {
return fmt.Errorf("drop partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
collInfo, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
if err != nil {
return err
}
partID, err := t.core.MetaTable.DeletePartition(coll.ID, t.Req.PartitionName)
partInfo, err := t.core.MetaTable.GetPartitionByName(collInfo.ID, t.Req.PartitionName)
if err != nil {
return err
}
@ -507,22 +520,32 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
CollectionName: t.Req.CollectionName,
PartitionName: t.Req.PartitionName,
DbID: 0, //todo,not used
CollectionID: coll.ID,
PartitionID: partID,
CollectionID: collInfo.ID,
PartitionID: partInfo.PartitionID,
}
err = t.core.DdDropPartitionReq(ctx, &ddReq)
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOpStr, err := EncodeDdOperation(&ddReq, nil, DropPartitionDDType)
if err != nil {
return err
}
_, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ddOpStr)
if err != nil {
return err
}
err = t.core.SendDdDropPartitionReq(ctx, &ddReq)
if err != nil {
return err
}
// error doesn't matter here
_ = t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
// Marking DDMsgFlagPrefix to true means ddMsg has been send successfully
t.core.MetaTable.client.Save(DDMsgFlagPrefix, "true")
return nil
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
}
type HasPartitionReqTask struct {

View File

@ -12,8 +12,10 @@
package masterservice
import (
"encoding/json"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
@ -66,3 +68,19 @@ func GetFieldSchemaByIndexID(coll *etcdpb.CollectionInfo, idxID typeutil.UniqueI
}
return GetFieldSchemaByID(coll, fieldID)
}
// EncodeDdOperation serialize DdOperation into string
func EncodeDdOperation(m proto.Message, m1 proto.Message, ddType string) (string, error) {
mStr := proto.MarshalTextString(m)
m1Str := proto.MarshalTextString(m1)
ddOp := DdOperation{
Body: mStr,
Body1: m1Str, // used for DdCreateCollection only
Type: ddType,
}
ddOpByte, err := json.Marshal(ddOp)
if err != nil {
return "", err
}
return string(ddOpByte), nil
}