From 08bb1b2ec302301e8b77be7d3c93202e735496b6 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Fri, 14 May 2021 21:26:06 +0800 Subject: [PATCH] Resend unsuccessful ddMsg when master start (#5214) Resend unsuccessful ddMsg when master start Signed-off-by: yudong.cai --- .../masterservice/masterservice_test.go | 8 +- internal/masterservice/master_service.go | 127 ++++++++++++-- internal/masterservice/master_service_test.go | 125 ++++++------- internal/masterservice/meta_table.go | 119 ++++++------- internal/masterservice/meta_table_test.go | 165 +++++++++++------- internal/masterservice/task.go | 139 +++++++++------ internal/masterservice/util.go | 18 ++ 7 files changed, 428 insertions(+), 273 deletions(-) diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index 5fecf61b4b..be0a108563 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -94,28 +94,28 @@ func TestGrpcService(t *testing.T) { return nil } createCollectionArray := make([]*internalpb.CreateCollectionRequest, 0, 16) - core.DdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest) error { + core.SendDdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest) error { t.Logf("Create Colllection %s", req.CollectionName) createCollectionArray = append(createCollectionArray, req) return nil } dropCollectionArray := make([]*internalpb.DropCollectionRequest, 0, 16) - core.DdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest) error { + core.SendDdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest) error { t.Logf("Drop Collection %s", req.CollectionName) dropCollectionArray = append(dropCollectionArray, req) return nil } createPartitionArray := make([]*internalpb.CreatePartitionRequest, 0, 16) - core.DdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest) error { + core.SendDdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest) error { t.Logf("Create Partition %s", req.PartitionName) createPartitionArray = append(createPartitionArray, req) return nil } dropPartitionArray := make([]*internalpb.DropPartitionRequest, 0, 16) - core.DdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest) error { + core.SendDdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest) error { t.Logf("Drop Partition %s", req.PartitionName) dropPartitionArray = append(dropPartitionArray, req) return nil diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index fc6bf595a0..d5aea20160 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -23,6 +23,7 @@ import ( "go.etcd.io/etcd/clientv3" "go.uber.org/zap" + "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/allocator" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" @@ -52,6 +53,13 @@ import ( // ------------------ struct ----------------------- +// DdOperation used to save ddMsg into ETCD +type DdOperation struct { + Body string `json:"body"` + Body1 string `json:"body1"` // used for CreateCollectionReq only + Type string `json:"type"` +} + // master core type Core struct { /* @@ -89,16 +97,16 @@ type Core struct { SendTimeTick func(t typeutil.Timestamp) error //setMsgStreams, send create collection into dd channel - DdCreateCollectionReq func(ctx context.Context, req *internalpb.CreateCollectionRequest) error + SendDdCreateCollectionReq func(ctx context.Context, req *internalpb.CreateCollectionRequest) error //setMsgStreams, send drop collection into dd channel, and notify the proxy to delete this collection - DdDropCollectionReq func(ctx context.Context, req *internalpb.DropCollectionRequest) error + SendDdDropCollectionReq func(ctx context.Context, req *internalpb.DropCollectionRequest) error //setMsgStreams, send create partition into dd channel - DdCreatePartitionReq func(ctx context.Context, req *internalpb.CreatePartitionRequest) error + SendDdCreatePartitionReq func(ctx context.Context, req *internalpb.CreatePartitionRequest) error //setMsgStreams, send drop partition into dd channel - DdDropPartitionReq func(ctx context.Context, req *internalpb.DropPartitionRequest) error + SendDdDropPartitionReq func(ctx context.Context, req *internalpb.DropPartitionRequest) error //setMsgStreams segment channel, receive segment info from data service, if master create segment DataServiceSegmentChan chan *datapb.SegmentInfo @@ -196,17 +204,17 @@ func (c *Core) checkInit() error { if c.ddReqQueue == nil { return fmt.Errorf("ddReqQueue is nil") } - if c.DdCreateCollectionReq == nil { - return fmt.Errorf("DdCreateCollectionReq is nil") + if c.SendDdCreateCollectionReq == nil { + return fmt.Errorf("SendDdCreateCollectionReq is nil") } - if c.DdDropCollectionReq == nil { - return fmt.Errorf("DdDropCollectionReq is nil") + if c.SendDdDropCollectionReq == nil { + return fmt.Errorf("SendDdDropCollectionReq is nil") } - if c.DdCreatePartitionReq == nil { - return fmt.Errorf("DdCreatePartitionReq is nil") + if c.SendDdCreatePartitionReq == nil { + return fmt.Errorf("SendDdCreatePartitionReq is nil") } - if c.DdDropPartitionReq == nil { - return fmt.Errorf("DdDropPartitionReq is nil") + if c.SendDdDropPartitionReq == nil { + return fmt.Errorf("SendDdDropPartitionReq is nil") } if c.DataServiceSegmentChan == nil { return fmt.Errorf("DataServiceSegmentChan is nil") @@ -403,6 +411,24 @@ func (c *Core) tsLoop() { } } } + +func (c *Core) setDdMsgSendFlag(b bool) error { + flag, err := c.MetaTable.client.Load(DDMsgSendPrefix) + if err != nil { + return err + } + + if (b && flag == "true") || (!b && flag == "false") { + log.Debug("DdMsg send flag need not change", zap.String("flag", flag)) + return nil + } + + if b { + return c.MetaTable.client.Save(DDMsgSendPrefix, "true") + } + return c.MetaTable.client.Save(DDMsgSendPrefix, "false") +} + func (c *Core) setMsgStreams() error { if Params.PulsarAddress == "" { return fmt.Errorf("PulsarAddress is empty") @@ -476,7 +502,7 @@ func (c *Core) setMsgStreams() error { return nil } - c.DdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest) error { + c.SendDdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest) error { msgPack := ms.MsgPack{} baseMsg := ms.BaseMsg{ Ctx: ctx, @@ -495,7 +521,7 @@ func (c *Core) setMsgStreams() error { return nil } - c.DdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest) error { + c.SendDdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest) error { msgPack := ms.MsgPack{} baseMsg := ms.BaseMsg{ Ctx: ctx, @@ -514,7 +540,7 @@ func (c *Core) setMsgStreams() error { return nil } - c.DdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest) error { + c.SendDdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest) error { msgPack := ms.MsgPack{} baseMsg := ms.BaseMsg{ Ctx: ctx, @@ -533,7 +559,7 @@ func (c *Core) setMsgStreams() error { return nil } - c.DdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest) error { + c.SendDdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest) error { msgPack := ms.MsgPack{} baseMsg := ms.BaseMsg{ Ctx: ctx, @@ -841,6 +867,72 @@ func (c *Core) Init() error { return initError } +func (c *Core) reSendDdMsg(ctx context.Context) error { + flag, err := c.MetaTable.client.Load(DDMsgSendPrefix) + if err != nil || flag == "true" { + log.Debug("No un-successful DdMsg") + return nil + } + + ddOpStr, err := c.MetaTable.client.Load(DDOperationPrefix) + if err != nil { + log.Debug("DdOperation key does not exist") + return nil + } + var ddOp DdOperation + if err = json.Unmarshal([]byte(ddOpStr), &ddOp); err != nil { + return err + } + + switch ddOp.Type { + case CreateCollectionDDType: + var ddCollReq = internalpb.CreateCollectionRequest{} + if err = proto.UnmarshalText(ddOp.Body, &ddCollReq); err != nil { + return err + } + // TODO: can optimize + var ddPartReq = internalpb.CreatePartitionRequest{} + if err = proto.UnmarshalText(ddOp.Body1, &ddPartReq); err != nil { + return err + } + if err = c.SendDdCreateCollectionReq(ctx, &ddCollReq); err != nil { + return err + } + if err = c.SendDdCreatePartitionReq(ctx, &ddPartReq); err != nil { + return err + } + case DropCollectionDDType: + var ddReq = internalpb.DropCollectionRequest{} + if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil { + return err + } + if err = c.SendDdDropCollectionReq(ctx, &ddReq); err != nil { + return err + } + case CreatePartitionDDType: + var ddReq = internalpb.CreatePartitionRequest{} + if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil { + return err + } + if err = c.SendDdCreatePartitionReq(ctx, &ddReq); err != nil { + return err + } + case DropPartitionDDType: + var ddReq = internalpb.DropPartitionRequest{} + if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil { + return err + } + if err = c.SendDdDropPartitionReq(ctx, &ddReq); err != nil { + return err + } + default: + return fmt.Errorf("Invalid DdOperation %s", ddOp.Type) + } + + // Update DDOperation in etcd + return c.setDdMsgSendFlag(true) +} + func (c *Core) Start() error { if err := c.checkInit(); err != nil { return err @@ -851,6 +943,9 @@ func (c *Core) Start() error { log.Debug("master", zap.String("time tick channel name", Params.TimeTickChannel)) c.startOnce.Do(func() { + if err := c.reSendDdMsg(c.ctx); err != nil { + return + } go c.startDdScheduler() go c.startTimeTickLoop() go c.startDataServiceSegmentLoop() diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index 3cacf8c199..4101bb9e3e 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -25,7 +25,6 @@ import ( "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/masterpb" @@ -406,25 +405,27 @@ func TestMasterService(t *testing.T) { assert.Nil(t, err) assert.Equal(t, createMsg.CollectionID, createMeta.ID) - // check DDMsg type and info - msgType, err := core.MetaTable.client.Load(DDMsgTypePrefix) + // check DD operation info + flag, err := core.MetaTable.client.Load(DDMsgSendPrefix) assert.Nil(t, err) - assert.Equal(t, CreateCollectionMsgType, msgType) + assert.Equal(t, "true", flag) + ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix) + assert.Nil(t, err) + var ddOp DdOperation + err = json.Unmarshal([]byte(ddOpStr), &ddOp) + assert.Nil(t, err) + assert.Equal(t, CreateCollectionDDType, ddOp.Type) - var meta map[string]string - metaStr, err := core.MetaTable.client.Load(DDMsgPrefix) - assert.Nil(t, err) - err = json.Unmarshal([]byte(metaStr), &meta) + var ddCollReq = internalpb.CreateCollectionRequest{} + err = proto.UnmarshalText(ddOp.Body, &ddCollReq) assert.Nil(t, err) + assert.Equal(t, createMeta.ID, ddCollReq.CollectionID) - k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, createMeta.ID) - v1 := meta[k1] - var collInfo etcdpb.CollectionInfo - err = proto.UnmarshalText(v1, &collInfo) + var ddPartReq = internalpb.CreatePartitionRequest{} + err = proto.UnmarshalText(ddOp.Body1, &ddPartReq) assert.Nil(t, err) - assert.Equal(t, createMeta.ID, collInfo.ID) - assert.Equal(t, createMeta.CreateTime, collInfo.CreateTime) - assert.Equal(t, createMeta.PartitionIDs[0], collInfo.PartitionIDs[0]) + assert.Equal(t, createMeta.ID, ddPartReq.CollectionID) + assert.Equal(t, createMeta.PartitionIDs[0], ddPartReq.PartitionID) }) t.Run("has collection", func(t *testing.T) { @@ -547,33 +548,22 @@ func TestMasterService(t *testing.T) { assert.Equal(t, 1, len(pm.GetCollArray())) assert.Equal(t, "testColl", pm.GetCollArray()[0]) - // check DDMsg type and info - msgType, err := core.MetaTable.client.Load(DDMsgTypePrefix) + // check DD operation info + flag, err := core.MetaTable.client.Load(DDMsgSendPrefix) assert.Nil(t, err) - assert.Equal(t, CreatePartitionMsgType, msgType) + assert.Equal(t, "true", flag) + ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix) + assert.Nil(t, err) + var ddOp DdOperation + err = json.Unmarshal([]byte(ddOpStr), &ddOp) + assert.Nil(t, err) + assert.Equal(t, CreatePartitionDDType, ddOp.Type) - var meta map[string]string - metaStr, err := core.MetaTable.client.Load(DDMsgPrefix) + var ddReq = internalpb.CreatePartitionRequest{} + err = proto.UnmarshalText(ddOp.Body, &ddReq) assert.Nil(t, err) - err = json.Unmarshal([]byte(metaStr), &meta) - assert.Nil(t, err) - - k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collMeta.ID) - v1 := meta[k1] - var collInfo etcdpb.CollectionInfo - err = proto.UnmarshalText(v1, &collInfo) - assert.Nil(t, err) - assert.Equal(t, collMeta.ID, collInfo.ID) - assert.Equal(t, collMeta.CreateTime, collInfo.CreateTime) - assert.Equal(t, collMeta.PartitionIDs[0], collInfo.PartitionIDs[0]) - - k2 := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, collMeta.ID, partMeta.PartitionID) - v2 := meta[k2] - var partInfo etcdpb.PartitionInfo - err = proto.UnmarshalText(v2, &partInfo) - assert.Nil(t, err) - assert.Equal(t, partMeta.PartitionName, partInfo.PartitionName) - assert.Equal(t, partMeta.PartitionID, partInfo.PartitionID) + assert.Equal(t, collMeta.ID, ddReq.CollectionID) + assert.Equal(t, partMeta.PartitionID, ddReq.PartitionID) }) t.Run("has partition", func(t *testing.T) { @@ -964,25 +954,22 @@ func TestMasterService(t *testing.T) { assert.Equal(t, 2, len(pm.GetCollArray())) assert.Equal(t, "testColl", pm.GetCollArray()[1]) - // check DDMsg type and info - msgType, err := core.MetaTable.client.Load(DDMsgTypePrefix) + // check DD operation info + flag, err := core.MetaTable.client.Load(DDMsgSendPrefix) assert.Nil(t, err) - assert.Equal(t, DropPartitionMsgType, msgType) + assert.Equal(t, "true", flag) + ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix) + assert.Nil(t, err) + var ddOp DdOperation + err = json.Unmarshal([]byte(ddOpStr), &ddOp) + assert.Nil(t, err) + assert.Equal(t, DropPartitionDDType, ddOp.Type) - var meta map[string]string - metaStr, err := core.MetaTable.client.Load(DDMsgPrefix) + var ddReq = internalpb.DropPartitionRequest{} + err = proto.UnmarshalText(ddOp.Body, &ddReq) assert.Nil(t, err) - err = json.Unmarshal([]byte(metaStr), &meta) - assert.Nil(t, err) - - k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collMeta.ID) - v1 := meta[k1] - var collInfo etcdpb.CollectionInfo - err = proto.UnmarshalText(v1, &collInfo) - assert.Nil(t, err) - assert.Equal(t, collMeta.ID, collInfo.ID) - assert.Equal(t, collMeta.CreateTime, collInfo.CreateTime) - assert.Equal(t, collMeta.PartitionIDs[0], collInfo.PartitionIDs[0]) + assert.Equal(t, collMeta.ID, ddReq.CollectionID) + assert.Equal(t, dropPartID, ddReq.PartitionID) }) t.Run("drop collection", func(t *testing.T) { @@ -1037,17 +1024,21 @@ func TestMasterService(t *testing.T) { assert.Equal(t, len(collArray), 3) assert.Equal(t, collArray[2], "testColl") - // check DDMsg type and info - msgType, err := core.MetaTable.client.Load(DDMsgTypePrefix) + // check DD operation info + flag, err := core.MetaTable.client.Load(DDMsgSendPrefix) assert.Nil(t, err) - assert.Equal(t, DropCollectionMsgType, msgType) + assert.Equal(t, "true", flag) + ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix) + assert.Nil(t, err) + var ddOp DdOperation + err = json.Unmarshal([]byte(ddOpStr), &ddOp) + assert.Nil(t, err) + assert.Equal(t, DropCollectionDDType, ddOp.Type) - var collID typeutil.UniqueID - collIDByte, err := core.MetaTable.client.Load(DDMsgPrefix) + var ddReq = internalpb.DropCollectionRequest{} + err = proto.UnmarshalText(ddOp.Body, &ddReq) assert.Nil(t, err) - err = json.Unmarshal([]byte(collIDByte), &collID) - assert.Nil(t, err) - assert.Equal(t, collMeta.ID, collID) + assert.Equal(t, collMeta.ID, ddReq.CollectionID) }) t.Run("context_cancel", func(t *testing.T) { @@ -1665,25 +1656,25 @@ func TestCheckInit(t *testing.T) { err = c.checkInit() assert.NotNil(t, err) - c.DdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest) error { + c.SendDdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest) error { return nil } err = c.checkInit() assert.NotNil(t, err) - c.DdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest) error { + c.SendDdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest) error { return nil } err = c.checkInit() assert.NotNil(t, err) - c.DdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest) error { + c.SendDdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest) error { return nil } err = c.checkInit() assert.NotNil(t, err) - c.DdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest) error { + c.SendDdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest) error { return nil } err = c.checkInit() diff --git a/internal/masterservice/meta_table.go b/internal/masterservice/meta_table.go index 336df1b9e5..402a3ac377 100644 --- a/internal/masterservice/meta_table.go +++ b/internal/masterservice/meta_table.go @@ -12,7 +12,7 @@ package masterservice import ( - "encoding/json" + "errors" "fmt" "path" "strconv" @@ -21,16 +21,13 @@ import ( "github.com/golang/protobuf/proto" "go.uber.org/zap" - "errors" - "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/typeutil" - "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" pb "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/schemapb" + "github.com/milvus-io/milvus/internal/util/typeutil" ) const ( @@ -42,14 +39,13 @@ const ( SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index" IndexMetaPrefix = ComponentPrefix + "/index" - DDMsgPrefix = ComponentPrefix + "/dd-msg" - DDMsgTypePrefix = ComponentPrefix + "/dd-msg-type" - DDMsgFlagPrefix = ComponentPrefix + "/dd-msg-flag" + DDOperationPrefix = ComponentPrefix + "/dd-operation" + DDMsgSendPrefix = ComponentPrefix + "/dd-msg-send" - CreateCollectionMsgType = "CreateCollection" - DropCollectionMsgType = "DropCollection" - CreatePartitionMsgType = "CreatePartition" - DropPartitionMsgType = "DropPartition" + CreateCollectionDDType = "CreateCollection" + DropCollectionDDType = "DropCollection" + CreatePartitionDDType = "CreatePartition" + DropPartitionDDType = "DropPartition" ) type metaTable struct { @@ -232,7 +228,7 @@ func (mt *metaTable) AddProxy(po *pb.ProxyMeta) error { return nil } -func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo) error { +func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo, ddOpStr string) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() @@ -270,16 +266,11 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionIn meta[k] = v } - // record ddmsg info and type - ddmsg, err := json.Marshal(meta) - if err != nil { - return err - } - meta[DDMsgPrefix] = string(ddmsg) - meta[DDMsgTypePrefix] = CreateCollectionMsgType - meta[DDMsgFlagPrefix] = "false" + // save ddOpStr into etcd + meta[DDOperationPrefix] = ddOpStr + meta[DDMsgSendPrefix] = "false" - err = mt.client.MultiSave(meta) + err := mt.client.MultiSave(meta) if err != nil { _ = mt.reloadFromKV() return err @@ -288,7 +279,7 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionIn return nil } -func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID) error { +func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr string) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() @@ -333,18 +324,13 @@ func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID) error { fmt.Sprintf("%s/%d", IndexMetaPrefix, collID), } - // record ddmsg info and type - ddmsg, err := json.Marshal(collID) - if err != nil { - return err - } - saveMeta := map[string]string{ - DDMsgPrefix: string(ddmsg), - DDMsgTypePrefix: DropCollectionMsgType, - DDMsgFlagPrefix: "false", + // save ddOpStr into etcd + var saveMeta = map[string]string{ + DDOperationPrefix: ddOpStr, + DDMsgSendPrefix: "false", } - err = mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys) + err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys) if err != nil { _ = mt.reloadFromKV() return err @@ -416,7 +402,7 @@ func (mt *metaTable) ListCollections() ([]string, error) { return colls, nil } -func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID) error { +func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ddOpStr string) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() coll, ok := mt.collID2Meta[collID] @@ -457,16 +443,11 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string v2 := proto.MarshalTextString(&partMeta) meta := map[string]string{k1: v1, k2: v2} - // record ddmsg info and type - ddmsg, err := json.Marshal(meta) - if err != nil { - return err - } - meta[DDMsgPrefix] = string(ddmsg) - meta[DDMsgTypePrefix] = CreatePartitionMsgType - meta[DDMsgFlagPrefix] = "false" + // save ddOpStr into etcd + meta[DDOperationPrefix] = ddOpStr + meta[DDMsgSendPrefix] = "false" - err = mt.client.MultiSave(meta) + err := mt.client.MultiSave(meta) if err != nil { _ = mt.reloadFromKV() return err @@ -474,25 +455,34 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string return nil } +func (mt *metaTable) getPartitionByName(collID typeutil.UniqueID, partitionName string) (pb.PartitionInfo, error) { + collMeta, ok := mt.collID2Meta[collID] + if !ok { + return pb.PartitionInfo{}, fmt.Errorf("can't find collection id = %d", collID) + } + for _, id := range collMeta.PartitionIDs { + partMeta, ok := mt.partitionID2Meta[id] + if ok && partMeta.PartitionName == partitionName { + return partMeta, nil + } + } + return pb.PartitionInfo{}, fmt.Errorf("partition %s does not exist", partitionName) +} + +func (mt *metaTable) GetPartitionByName(collID typeutil.UniqueID, partitionName string) (pb.PartitionInfo, error) { + mt.ddLock.RLock() + defer mt.ddLock.RUnlock() + return mt.getPartitionByName(collID, partitionName) +} + func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string) bool { mt.ddLock.RLock() defer mt.ddLock.RUnlock() - col, ok := mt.collID2Meta[collID] - if !ok { - return false - } - for _, partitionID := range col.PartitionIDs { - meta, ok := mt.partitionID2Meta[partitionID] - if ok { - if meta.PartitionName == partitionName { - return true - } - } - } - return false + _, err := mt.getPartitionByName(collID, partitionName) + return err == nil } -func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string) (typeutil.UniqueID, error) { +func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ddOpStr string) (typeutil.UniqueID, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() @@ -540,7 +530,7 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str } delete(mt.segID2IndexMeta, segID) } - collKV := map[string]string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)): proto.MarshalTextString(&collMeta)} + meta := map[string]string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)): proto.MarshalTextString(&collMeta)} delMetaKeys := []string{ fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, collMeta.ID, partMeta.PartitionID), } @@ -549,16 +539,11 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str delMetaKeys = append(delMetaKeys, k) } - // record ddmsg info and type - ddmsg, err := json.Marshal(collKV) - if err != nil { - return 0, err - } - collKV[DDMsgPrefix] = string(ddmsg) - collKV[DDMsgTypePrefix] = DropPartitionMsgType - collKV[DDMsgFlagPrefix] = "false" + // save ddOpStr into etcd + meta[DDOperationPrefix] = ddOpStr + meta[DDMsgSendPrefix] = "false" - err = mt.client.MultiSaveAndRemoveWithPrefix(collKV, delMetaKeys) + err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys) if err != nil { _ = mt.reloadFromKV() return 0, err diff --git a/internal/masterservice/meta_table_test.go b/internal/masterservice/meta_table_test.go index 8b5be630bc..542cce6b14 100644 --- a/internal/masterservice/meta_table_test.go +++ b/internal/masterservice/meta_table_test.go @@ -147,6 +147,15 @@ func Test_MockKV(t *testing.T) { } func TestMetaTable(t *testing.T) { + const collID = typeutil.UniqueID(1) + const collIDInvalid = typeutil.UniqueID(2) + const partIDDefault = typeutil.UniqueID(10) + const partID = typeutil.UniqueID(20) + const partIDInvalid = typeutil.UniqueID(21) + const segID = typeutil.UniqueID(100) + const segID2 = typeutil.UniqueID(101) + const fieldID = typeutil.UniqueID(110) + rand.Seed(time.Now().UnixNano()) randVal := rand.Int() Params.Init() @@ -161,14 +170,14 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo := &pb.CollectionInfo{ - ID: 1, + ID: collID, Schema: &schemapb.CollectionSchema{ Name: "testColl", Description: "", AutoID: false, Fields: []*schemapb.FieldSchema{ { - FieldID: 110, + FieldID: fieldID, Name: "field110", IsPrimaryKey: false, Description: "", @@ -198,16 +207,21 @@ func TestMetaTable(t *testing.T) { }, FieldIndexes: []*pb.FieldIndexInfo{ { - FiledID: 110, + FiledID: fieldID, IndexID: 10000, }, }, CreateTime: 0, PartitionIDs: nil, } + partInfoDefault := &pb.PartitionInfo{ + PartitionName: "_default", + PartitionID: partIDDefault, + SegmentIDs: nil, + } partInfo := &pb.PartitionInfo{ PartitionName: "testPart", - PartitionID: 10, + PartitionID: partID, SegmentIDs: nil, } idxInfo := []*pb.IndexInfo{ @@ -228,55 +242,69 @@ func TestMetaTable(t *testing.T) { } t.Run("add collection", func(t *testing.T) { - partInfo.SegmentIDs = []int64{100} - err = mt.AddCollection(collInfo, partInfo, idxInfo) + partInfoDefault.SegmentIDs = []int64{segID} + err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "") assert.NotNil(t, err) - partInfo.SegmentIDs = []int64{} + partInfoDefault.SegmentIDs = []int64{} - collInfo.PartitionIDs = []int64{100} - err = mt.AddCollection(collInfo, partInfo, idxInfo) + collInfo.PartitionIDs = []int64{segID} + err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "") assert.NotNil(t, err) collInfo.PartitionIDs = []int64{} - err = mt.AddCollection(collInfo, partInfo, nil) + err = mt.AddCollection(collInfo, partInfoDefault, nil, "") assert.NotNil(t, err) - err = mt.AddCollection(collInfo, partInfo, idxInfo) + err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "") assert.Nil(t, err) collMeta, err := mt.GetCollectionByName("testColl") assert.Nil(t, err) - assert.Equal(t, collMeta.PartitionIDs[0], int64(10)) + assert.Equal(t, collMeta.PartitionIDs[0], partIDDefault) assert.Equal(t, len(collMeta.PartitionIDs), 1) assert.True(t, mt.HasCollection(collInfo.ID)) field, err := mt.GetFieldSchema("testColl", "field110") assert.Nil(t, err) assert.Equal(t, field.FieldID, collInfo.Schema.Fields[0].FieldID) + + // check DD operation flag + flag, err := mt.client.Load(DDMsgSendPrefix) + assert.Nil(t, err) + assert.Equal(t, "false", flag) + }) + + t.Run("add partition", func(t *testing.T) { + assert.Nil(t, mt.AddPartition(collID, partInfo.PartitionName, partInfo.PartitionID, "")) + + // check DD operation flag + flag, err := mt.client.Load(DDMsgSendPrefix) + assert.Nil(t, err) + assert.Equal(t, "false", flag) }) t.Run("add segment", func(t *testing.T) { seg := &datapb.SegmentInfo{ - ID: 100, - CollectionID: 1, - PartitionID: 10, + ID: segID, + CollectionID: collID, + PartitionID: partID, } assert.Nil(t, mt.AddSegment(seg)) assert.NotNil(t, mt.AddSegment(seg)) - seg.ID = 101 - seg.CollectionID = 2 + seg.ID = segID2 + seg.CollectionID = collIDInvalid assert.NotNil(t, mt.AddSegment(seg)) - seg.CollectionID = 1 - seg.PartitionID = 11 + seg.CollectionID = collID + seg.PartitionID = partIDInvalid assert.NotNil(t, mt.AddSegment(seg)) - seg.PartitionID = 10 + seg.PartitionID = partID assert.Nil(t, mt.AddSegment(seg)) }) t.Run("add segment index", func(t *testing.T) { seg := pb.SegmentIndexInfo{ - SegmentID: 100, - FieldID: 110, + SegmentID: segID, + FieldID: fieldID, IndexID: 10000, BuildID: 201, } @@ -318,7 +346,7 @@ func TestMetaTable(t *testing.T) { seg, field, err := mt.GetNotIndexedSegments("testColl", "field110", idxInfo) assert.Nil(t, err) assert.Equal(t, len(seg), 1) - assert.Equal(t, seg[0], int64(101)) + assert.Equal(t, seg[0], segID2) assert.True(t, EqualKeyPairArray(field.TypeParams, tparams)) params = []*commonpb.KeyValuePair{ @@ -334,8 +362,8 @@ func TestMetaTable(t *testing.T) { seg, field, err = mt.GetNotIndexedSegments("testColl", "field110", idxInfo) assert.Nil(t, err) assert.Equal(t, len(seg), 2) - assert.Equal(t, seg[0], int64(100)) - assert.Equal(t, seg[1], int64(101)) + assert.Equal(t, seg[0], segID) + assert.Equal(t, seg[1], segID2) assert.True(t, EqualKeyPairArray(field.TypeParams, tparams)) }) @@ -397,16 +425,31 @@ func TestMetaTable(t *testing.T) { assert.Equal(t, len(idxs), 1) assert.Equal(t, idxs[0].IndexID, int64(2001)) - _, err = mt.GetSegmentIndexInfoByID(100, -1, "") + _, err = mt.GetSegmentIndexInfoByID(segID, -1, "") assert.NotNil(t, err) + }) + t.Run("drop partition", func(t *testing.T) { + id, err := mt.DeletePartition(collID, partInfo.PartitionName, "") + assert.Nil(t, err) + assert.Equal(t, partID, id) + + // check DD operation flag + flag, err := mt.client.Load(DDMsgSendPrefix) + assert.Nil(t, err) + assert.Equal(t, "false", flag) }) t.Run("drop collection", func(t *testing.T) { - err := mt.DeleteCollection(2) + err := mt.DeleteCollection(collIDInvalid, "") assert.NotNil(t, err) - err = mt.DeleteCollection(1) + err = mt.DeleteCollection(collID, "") assert.Nil(t, err) + + // check DD operation flag + flag, err := mt.client.Load(DDMsgSendPrefix) + assert.Nil(t, err) + assert.Equal(t, "false", flag) }) /////////////////////////// these tests should run at last, it only used to hit the error lines //////////////////////// @@ -421,7 +464,7 @@ func TestMetaTable(t *testing.T) { return fmt.Errorf("multi save error") } collInfo.PartitionIDs = nil - err := mt.AddCollection(collInfo, partInfo, idxInfo) + err := mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.NotNil(t, err) assert.EqualError(t, err, "multi save error") }) @@ -434,11 +477,11 @@ func TestMetaTable(t *testing.T) { return fmt.Errorf("milti save and remove with prefix error") } collInfo.PartitionIDs = nil - err := mt.AddCollection(collInfo, partInfo, idxInfo) + err := mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) mt.partitionID2Meta = make(map[typeutil.UniqueID]pb.PartitionInfo) mt.indexID2Meta = make(map[int64]pb.IndexInfo) - err = mt.DeleteCollection(collInfo.ID) + err = mt.DeleteCollection(collInfo.ID, "") assert.NotNil(t, err) assert.EqualError(t, err, "milti save and remove with prefix error") }) @@ -449,13 +492,13 @@ func TestMetaTable(t *testing.T) { } collInfo.PartitionIDs = nil - err := mt.AddCollection(collInfo, partInfo, idxInfo) + err := mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) seg := &datapb.SegmentInfo{ ID: 100, - CollectionID: 1, - PartitionID: 10, + CollectionID: collID, + PartitionID: partID, } assert.Nil(t, mt.AddSegment(seg)) @@ -485,17 +528,17 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo) + err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) - err = mt.AddPartition(2, "no-part", 22) + err = mt.AddPartition(2, "no-part", 22, "") assert.NotNil(t, err) assert.EqualError(t, err, "can't find collection. id = 2") coll := mt.collID2Meta[collInfo.ID] coll.PartitionIDs = make([]int64, Params.MaxPartitionNum) mt.collID2Meta[coll.ID] = coll - err = mt.AddPartition(coll.ID, "no-part", 22) + err = mt.AddPartition(coll.ID, "no-part", 22, "") assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("maximum partition's number should be limit to %d", Params.MaxPartitionNum)) @@ -505,7 +548,7 @@ func TestMetaTable(t *testing.T) { mockKV.multiSave = func(kvs map[string]string) error { return fmt.Errorf("multi save error") } - err = mt.AddPartition(coll.ID, "no-part", 22) + err = mt.AddPartition(coll.ID, "no-part", 22, "") assert.NotNil(t, err) assert.EqualError(t, err, "multi save error") @@ -513,13 +556,13 @@ func TestMetaTable(t *testing.T) { return nil } collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo) + err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) - err = mt.AddPartition(coll.ID, partInfo.PartitionName, 22) + err = mt.AddPartition(coll.ID, partInfo.PartitionName, 22, "") assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("partition name = %s already exists", partInfo.PartitionName)) - err = mt.AddPartition(coll.ID, "no-part", partInfo.PartitionID) + err = mt.AddPartition(coll.ID, "no-part", partInfo.PartitionID, "") assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("partition id = %d already exists", partInfo.PartitionID)) }) @@ -535,7 +578,7 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo) + err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) mt.partitionID2Meta = make(map[int64]pb.PartitionInfo) @@ -556,14 +599,14 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo) + err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) - _, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName) + _, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName, "") assert.NotNil(t, err) assert.EqualError(t, err, "default partition cannot be deleted") - _, err = mt.DeletePartition(collInfo.ID, "abc") + _, err = mt.DeletePartition(collInfo.ID, "abc", "") assert.NotNil(t, err) assert.EqualError(t, err, "partition abc does not exist") @@ -573,12 +616,12 @@ func TestMetaTable(t *testing.T) { mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string) error { return fmt.Errorf("multi save and remove with prefix error") } - _, err = mt.DeletePartition(collInfo.ID, pm.PartitionName) + _, err = mt.DeletePartition(collInfo.ID, pm.PartitionName, "") assert.NotNil(t, err) assert.EqualError(t, err, "multi save and remove with prefix error") mt.collID2Meta = make(map[int64]pb.CollectionInfo) - _, err = mt.DeletePartition(collInfo.ID, "abc") + _, err = mt.DeletePartition(collInfo.ID, "abc", "") assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("can't find collection id = %d", collInfo.ID)) @@ -598,7 +641,7 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo) + err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) noPart := pb.PartitionInfo{ @@ -644,13 +687,13 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo) + err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) seg := &datapb.SegmentInfo{ ID: 100, - CollectionID: 1, - PartitionID: 10, + CollectionID: collID, + PartitionID: partID, } assert.Nil(t, mt.AddSegment(seg)) @@ -683,7 +726,7 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo) + err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) assert.Nil(t, mt.AddSegment(seg)) @@ -710,7 +753,7 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo) + err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) _, _, err = mt.DropIndex("abc", "abc", "abc") @@ -747,7 +790,7 @@ func TestMetaTable(t *testing.T) { err = mt.reloadFromKV() assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo) + err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) mt.partitionID2Meta = make(map[int64]pb.PartitionInfo) mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string) error { @@ -772,7 +815,7 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo) + err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) _, err = mt.GetSegmentIndexInfoByID(101, 101, "abc") @@ -793,8 +836,8 @@ func TestMetaTable(t *testing.T) { segInfo := &datapb.SegmentInfo{ ID: 100, - CollectionID: 1, - PartitionID: 10, + CollectionID: collID, + PartitionID: partID, } assert.Nil(t, mt.AddSegment(segInfo)) segIdx := &pb.SegmentIndexInfo{ @@ -831,7 +874,7 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo) + err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) mt.collID2Meta = make(map[int64]pb.CollectionInfo) @@ -902,7 +945,7 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo) + err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) _, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, "no-field", idx) @@ -927,7 +970,7 @@ func TestMetaTable(t *testing.T) { return nil } collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo) + err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) coll := mt.collID2Meta[collInfo.ID] coll.FieldIndexes = append(coll.FieldIndexes, &pb.FieldIndexInfo{FiledID: coll.FieldIndexes[0].FiledID, IndexID: coll.FieldIndexes[0].IndexID + 1}) @@ -975,7 +1018,7 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo) + err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) mt.indexID2Meta = make(map[int64]pb.IndexInfo) _, _, err = mt.GetIndexByName(collInfo.Schema.Name, idxInfo[0].IndexName) diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index ab3254a7f0..d6021f52c9 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -122,11 +122,11 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { if err != nil { return err } - partitionID, _, err := t.core.idAllocator(1) + partID, _, err := t.core.idAllocator(1) if err != nil { return err } - collMeta := etcdpb.CollectionInfo{ + collInfo := etcdpb.CollectionInfo{ ID: collID, Schema: &schema, CreateTime: collTs, @@ -135,9 +135,9 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { } // every collection has _default partition - partMeta := etcdpb.PartitionInfo{ + partInfo := etcdpb.PartitionInfo{ PartitionName: Params.DefaultPartitionName, - PartitionID: partitionID, + PartitionID: partID, SegmentIDs: make([]typeutil.UniqueID, 0, 16), } idxInfo := make([]*etcdpb.IndexInfo, 0, 16) @@ -164,11 +164,6 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { // } //} - err = t.core.MetaTable.AddCollection(&collMeta, &partMeta, idxInfo) - if err != nil { - return err - } - // schema is modified (add RowIDField and TimestampField), // so need Marshal again schemaBytes, err := proto.Marshal(&schema) @@ -176,7 +171,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { return err } - ddReq := internalpb.CreateCollectionRequest{ + ddCollReq := internalpb.CreateCollectionRequest{ Base: t.Req.Base, DbName: t.Req.DbName, CollectionName: t.Req.CollectionName, @@ -185,12 +180,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { Schema: schemaBytes, } - err = t.core.DdCreateCollectionReq(ctx, &ddReq) - if err != nil { - return err - } - - ddPart := internalpb.CreatePartitionRequest{ + ddPartReq := internalpb.CreatePartitionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_CreatePartition, MsgID: t.Req.Base.MsgID, //TODO, msg id @@ -201,19 +191,33 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { CollectionName: t.Req.CollectionName, PartitionName: Params.DefaultPartitionName, DbID: 0, //TODO, not used - CollectionID: collMeta.ID, - PartitionID: partMeta.PartitionID, + CollectionID: collInfo.ID, + PartitionID: partInfo.PartitionID, } - err = t.core.DdCreatePartitionReq(ctx, &ddPart) + // build DdOperation and save it into etcd, when ddmsg send fail, + // system can restore ddmsg from etcd and re-send + ddOpStr, err := EncodeDdOperation(&ddCollReq, &ddPartReq, CreateCollectionDDType) if err != nil { return err } - // Marking DDMsgFlagPrefix to true means ddMsg has been send successfully - t.core.MetaTable.client.Save(DDMsgFlagPrefix, "true") + err = t.core.MetaTable.AddCollection(&collInfo, &partInfo, idxInfo, ddOpStr) + if err != nil { + return err + } - return nil + err = t.core.SendDdCreateCollectionReq(ctx, &ddCollReq) + if err != nil { + return err + } + err = t.core.SendDdCreatePartitionReq(ctx, &ddPartReq) + if err != nil { + return err + } + + // Update DDOperation in etcd + return t.core.setDdMsgSendFlag(true) } type DropCollectionReqTask struct { @@ -246,16 +250,6 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { if err != nil { return err } - if err = t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName); err != nil { - return err - } - - err = t.core.MetaTable.DeleteCollection(collMeta.ID) - if err != nil { - return err - } - - //data service should drop segments , which belong to this collection, from the segment manager ddReq := internalpb.DropCollectionRequest{ Base: t.Req.Base, @@ -265,7 +259,19 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { CollectionID: collMeta.ID, } - err = t.core.DdDropCollectionReq(ctx, &ddReq) + // build DdOperation and save it into etcd, when ddmsg send fail, + // system can restore ddmsg from etcd and re-send + ddOpStr, err := EncodeDdOperation(&ddReq, nil, DropCollectionDDType) + if err != nil { + return err + } + + err = t.core.MetaTable.DeleteCollection(collMeta.ID, ddOpStr) + if err != nil { + return err + } + + err = t.core.SendDdDropCollectionReq(ctx, &ddReq) if err != nil { return err } @@ -277,10 +283,11 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { } }() - // Marking DDMsgFlagPrefix to true means ddMsg has been send successfully - t.core.MetaTable.client.Save(DDMsgFlagPrefix, "true") + // error doesn't matter here + t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName) - return nil + // Update DDOperation in etcd + return t.core.setDdMsgSendFlag(true) } type HasCollectionReqTask struct { @@ -434,11 +441,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { if err != nil { return err } - partitionID, _, err := t.core.idAllocator(1) - if err != nil { - return err - } - err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partitionID) + partID, _, err := t.core.idAllocator(1) if err != nil { return err } @@ -450,21 +453,31 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { PartitionName: t.Req.PartitionName, DbID: 0, // todo, not used CollectionID: collMeta.ID, - PartitionID: partitionID, + PartitionID: partID, } - err = t.core.DdCreatePartitionReq(ctx, &ddReq) + // build DdOperation and save it into etcd, when ddmsg send fail, + // system can restore ddmsg from etcd and re-send + ddOpStr, err := EncodeDdOperation(&ddReq, nil, CreatePartitionDDType) + if err != nil { + return err + } + + err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ddOpStr) + if err != nil { + return err + } + + err = t.core.SendDdCreatePartitionReq(ctx, &ddReq) if err != nil { return err } // error doesn't matter here - _ = t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName) + t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName) - // Marking DDMsgFlagPrefix to true means ddMsg has been send successfully - t.core.MetaTable.client.Save(DDMsgFlagPrefix, "true") - - return nil + // Update DDOperation in etcd + return t.core.setDdMsgSendFlag(true) } type DropPartitionReqTask struct { @@ -492,11 +505,11 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_DropPartition { return fmt.Errorf("drop partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) } - coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) + collInfo, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) if err != nil { return err } - partID, err := t.core.MetaTable.DeletePartition(coll.ID, t.Req.PartitionName) + partInfo, err := t.core.MetaTable.GetPartitionByName(collInfo.ID, t.Req.PartitionName) if err != nil { return err } @@ -507,22 +520,32 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error { CollectionName: t.Req.CollectionName, PartitionName: t.Req.PartitionName, DbID: 0, //todo,not used - CollectionID: coll.ID, - PartitionID: partID, + CollectionID: collInfo.ID, + PartitionID: partInfo.PartitionID, } - err = t.core.DdDropPartitionReq(ctx, &ddReq) + // build DdOperation and save it into etcd, when ddmsg send fail, + // system can restore ddmsg from etcd and re-send + ddOpStr, err := EncodeDdOperation(&ddReq, nil, DropPartitionDDType) + if err != nil { + return err + } + + _, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ddOpStr) + if err != nil { + return err + } + + err = t.core.SendDdDropPartitionReq(ctx, &ddReq) if err != nil { return err } // error doesn't matter here - _ = t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName) + t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName) - // Marking DDMsgFlagPrefix to true means ddMsg has been send successfully - t.core.MetaTable.client.Save(DDMsgFlagPrefix, "true") - - return nil + // Update DDOperation in etcd + return t.core.setDdMsgSendFlag(true) } type HasPartitionReqTask struct { diff --git a/internal/masterservice/util.go b/internal/masterservice/util.go index 2ac5e927e7..4fbd5be69d 100644 --- a/internal/masterservice/util.go +++ b/internal/masterservice/util.go @@ -12,8 +12,10 @@ package masterservice import ( + "encoding/json" "fmt" + "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/schemapb" @@ -66,3 +68,19 @@ func GetFieldSchemaByIndexID(coll *etcdpb.CollectionInfo, idxID typeutil.UniqueI } return GetFieldSchemaByID(coll, fieldID) } + +// EncodeDdOperation serialize DdOperation into string +func EncodeDdOperation(m proto.Message, m1 proto.Message, ddType string) (string, error) { + mStr := proto.MarshalTextString(m) + m1Str := proto.MarshalTextString(m1) + ddOp := DdOperation{ + Body: mStr, + Body1: m1Str, // used for DdCreateCollection only + Type: ddType, + } + ddOpByte, err := json.Marshal(ddOp) + if err != nil { + return "", err + } + return string(ddOpByte), nil +}