Save ddmsg info and type into ETCD (#5173)

Save ddmsg info and type into ETCD to support re-send ddmsg 
when system restart or recover from failure (#5172)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/5188/head
Cai Yudong 2021-05-12 15:33:53 +08:00 committed by GitHub
parent d543e2c6cc
commit 7b83b11e95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 156 additions and 11 deletions

View File

@ -13,6 +13,7 @@ package masterservice
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"sync"
@ -24,6 +25,7 @@ import (
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/masterpb"
@ -403,6 +405,26 @@ func TestMasterService(t *testing.T) {
createMeta, err = core.MetaTable.GetCollectionByName("testColl-again")
assert.Nil(t, err)
assert.Equal(t, createMsg.CollectionID, createMeta.ID)
// check DDMsg type and info
msgType, err := core.MetaTable.client.Load(DDMsgTypePrefix)
assert.Nil(t, err)
assert.Equal(t, CreateCollectionMsgType, msgType)
var meta map[string]string
metaStr, err := core.MetaTable.client.Load(DDMsgPrefix)
assert.Nil(t, err)
err = json.Unmarshal([]byte(metaStr), &meta)
assert.Nil(t, err)
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, createMeta.ID)
v1 := meta[k1]
var collInfo etcdpb.CollectionInfo
err = proto.UnmarshalText(v1, &collInfo)
assert.Nil(t, err)
assert.Equal(t, createMeta.ID, collInfo.ID)
assert.Equal(t, createMeta.CreateTime, collInfo.CreateTime)
assert.Equal(t, createMeta.PartitionIDs[0], collInfo.PartitionIDs[0])
})
t.Run("has collection", func(t *testing.T) {
@ -524,6 +546,34 @@ func TestMasterService(t *testing.T) {
assert.Equal(t, 1, len(pm.GetCollArray()))
assert.Equal(t, "testColl", pm.GetCollArray()[0])
// check DDMsg type and info
msgType, err := core.MetaTable.client.Load(DDMsgTypePrefix)
assert.Nil(t, err)
assert.Equal(t, CreatePartitionMsgType, msgType)
var meta map[string]string
metaStr, err := core.MetaTable.client.Load(DDMsgPrefix)
assert.Nil(t, err)
err = json.Unmarshal([]byte(metaStr), &meta)
assert.Nil(t, err)
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collMeta.ID)
v1 := meta[k1]
var collInfo etcdpb.CollectionInfo
err = proto.UnmarshalText(v1, &collInfo)
assert.Nil(t, err)
assert.Equal(t, collMeta.ID, collInfo.ID)
assert.Equal(t, collMeta.CreateTime, collInfo.CreateTime)
assert.Equal(t, collMeta.PartitionIDs[0], collInfo.PartitionIDs[0])
k2 := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, collMeta.ID, partMeta.PartitionID)
v2 := meta[k2]
var partInfo etcdpb.PartitionInfo
err = proto.UnmarshalText(v2, &partInfo)
assert.Nil(t, err)
assert.Equal(t, partMeta.PartitionName, partInfo.PartitionName)
assert.Equal(t, partMeta.PartitionID, partInfo.PartitionID)
})
t.Run("has partition", func(t *testing.T) {
@ -913,6 +963,26 @@ func TestMasterService(t *testing.T) {
assert.Equal(t, 2, len(pm.GetCollArray()))
assert.Equal(t, "testColl", pm.GetCollArray()[1])
// check DDMsg type and info
msgType, err := core.MetaTable.client.Load(DDMsgTypePrefix)
assert.Nil(t, err)
assert.Equal(t, DropPartitionMsgType, msgType)
var meta map[string]string
metaStr, err := core.MetaTable.client.Load(DDMsgPrefix)
assert.Nil(t, err)
err = json.Unmarshal([]byte(metaStr), &meta)
assert.Nil(t, err)
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collMeta.ID)
v1 := meta[k1]
var collInfo etcdpb.CollectionInfo
err = proto.UnmarshalText(v1, &collInfo)
assert.Nil(t, err)
assert.Equal(t, collMeta.ID, collInfo.ID)
assert.Equal(t, collMeta.CreateTime, collInfo.CreateTime)
assert.Equal(t, collMeta.PartitionIDs[0], collInfo.PartitionIDs[0])
})
t.Run("drop collection", func(t *testing.T) {
@ -966,6 +1036,18 @@ func TestMasterService(t *testing.T) {
collArray = pm.GetCollArray()
assert.Equal(t, len(collArray), 3)
assert.Equal(t, collArray[2], "testColl")
// check DDMsg type and info
msgType, err := core.MetaTable.client.Load(DDMsgTypePrefix)
assert.Nil(t, err)
assert.Equal(t, DropCollectionMsgType, msgType)
var collID typeutil.UniqueID
collIDByte, err := core.MetaTable.client.Load(DDMsgPrefix)
assert.Nil(t, err)
err = json.Unmarshal([]byte(collIDByte), &collID)
assert.Nil(t, err)
assert.Equal(t, collMeta.ID, collID)
})
t.Run("context_cancel", func(t *testing.T) {

View File

@ -12,6 +12,7 @@
package masterservice
import (
"encoding/json"
"fmt"
"path"
"strconv"
@ -40,6 +41,15 @@ const (
PartitionMetaPrefix = ComponentPrefix + "/partition"
SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index"
IndexMetaPrefix = ComponentPrefix + "/index"
DDMsgPrefix = ComponentPrefix + "/dd-msg"
DDMsgTypePrefix = ComponentPrefix + "/dd-msg-type"
DDMsgFlagPrefix = ComponentPrefix + "/dd-msg-flag"
CreateCollectionMsgType = "CreateCollection"
DropCollectionMsgType = "DropCollection"
CreatePartitionMsgType = "CreatePartition"
DropPartitionMsgType = "DropPartition"
)
type metaTable struct {
@ -229,7 +239,6 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionIn
if len(part.SegmentIDs) != 0 {
return errors.New("segment should be empty when creating collection")
}
if len(coll.PartitionIDs) != 0 {
return errors.New("partitions should be empty when creating collection")
}
@ -261,11 +270,21 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionIn
meta[k] = v
}
err := mt.client.MultiSave(meta)
// record ddmsg info and type
ddmsg, err := json.Marshal(meta)
if err != nil {
return err
}
meta[DDMsgPrefix] = string(ddmsg)
meta[DDMsgTypePrefix] = CreateCollectionMsgType
meta[DDMsgFlagPrefix] = "false"
err = mt.client.MultiSave(meta)
if err != nil {
_ = mt.reloadFromKV()
return err
}
return nil
}
@ -307,13 +326,25 @@ func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID) error {
}
delete(mt.indexID2Meta, idxInfo.IndexID)
}
metas := []string{
delMetakeys := []string{
fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID),
fmt.Sprintf("%s/%d", PartitionMetaPrefix, collID),
fmt.Sprintf("%s/%d", SegmentIndexMetaPrefix, collID),
fmt.Sprintf("%s/%d", IndexMetaPrefix, collID),
}
err := mt.client.MultiRemoveWithPrefix(metas)
// record ddmsg info and type
ddmsg, err := json.Marshal(collID)
if err != nil {
return err
}
saveMeta := map[string]string{
DDMsgPrefix: string(ddmsg),
DDMsgTypePrefix: DropCollectionMsgType,
DDMsgFlagPrefix: "false",
}
err = mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys)
if err != nil {
_ = mt.reloadFromKV()
return err
@ -426,8 +457,16 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string
v2 := proto.MarshalTextString(&partMeta)
meta := map[string]string{k1: v1, k2: v2}
err := mt.client.MultiSave(meta)
// record ddmsg info and type
ddmsg, err := json.Marshal(meta)
if err != nil {
return err
}
meta[DDMsgPrefix] = string(ddmsg)
meta[DDMsgTypePrefix] = CreatePartitionMsgType
meta[DDMsgFlagPrefix] = "false"
err = mt.client.MultiSave(meta)
if err != nil {
_ = mt.reloadFromKV()
return err
@ -480,9 +519,7 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
partMeta = pm
exist = true
}
}
}
if !exist {
return 0, fmt.Errorf("partition %s does not exist", partitionName)
@ -512,8 +549,16 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
delMetaKeys = append(delMetaKeys, k)
}
err := mt.client.MultiSaveAndRemoveWithPrefix(collKV, delMetaKeys)
// record ddmsg info and type
ddmsg, err := json.Marshal(collKV)
if err != nil {
return 0, err
}
collKV[DDMsgPrefix] = string(ddmsg)
collKV[DDMsgTypePrefix] = DropPartitionMsgType
collKV[DDMsgFlagPrefix] = "false"
err = mt.client.MultiSaveAndRemoveWithPrefix(collKV, delMetaKeys)
if err != nil {
_ = mt.reloadFromKV()
return 0, err

View File

@ -430,8 +430,8 @@ func TestMetaTable(t *testing.T) {
mockKV.multiSave = func(kvs map[string]string) error {
return nil
}
mockKV.multiRemoveWithPrefix = func(keys []string) error {
return fmt.Errorf("milti remove with prefix error")
mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string) error {
return fmt.Errorf("milti save and remove with prefix error")
}
collInfo.PartitionIDs = nil
err := mt.AddCollection(collInfo, partInfo, idxInfo)
@ -440,7 +440,7 @@ func TestMetaTable(t *testing.T) {
mt.indexID2Meta = make(map[int64]pb.IndexInfo)
err = mt.DeleteCollection(collInfo.ID)
assert.NotNil(t, err)
assert.EqualError(t, err, "milti remove with prefix error")
assert.EqualError(t, err, "milti save and remove with prefix error")
})
t.Run("get collection failed", func(t *testing.T) {

View File

@ -133,6 +133,8 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
PartitionIDs: make([]typeutil.UniqueID, 0, 16),
FieldIndexes: make([]*etcdpb.FieldIndexInfo, 0, 16),
}
// every collection has _default partition
partMeta := etcdpb.PartitionInfo{
PartitionName: Params.DefaultPartitionName,
PartitionID: partitionID,
@ -166,6 +168,9 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
// schema is modified (add RowIDField and TimestampField),
// so need Marshal again
schemaBytes, err := proto.Marshal(&schema)
if err != nil {
return err
@ -205,6 +210,9 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
return err
}
// Marking DDMsgFlagPrefix to true means ddMsg has been send successfully
t.core.MetaTable.client.Save(DDMsgFlagPrefix, "true")
return nil
}
@ -269,6 +277,9 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
}
}()
// Marking DDMsgFlagPrefix to true means ddMsg has been send successfully
t.core.MetaTable.client.Save(DDMsgFlagPrefix, "true")
return nil
}
@ -450,6 +461,9 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
// error doesn't matter here
_ = t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
// Marking DDMsgFlagPrefix to true means ddMsg has been send successfully
t.core.MetaTable.client.Save(DDMsgFlagPrefix, "true")
return nil
}
@ -504,6 +518,10 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
// error doesn't matter here
_ = t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
// Marking DDMsgFlagPrefix to true means ddMsg has been send successfully
t.core.MetaTable.client.Save(DDMsgFlagPrefix, "true")
return nil
}