Fix CreateAlias/DropAlias/AlterAlias impl (#9701)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/9719/head
Cai Yudong 2021-10-12 17:40:34 +08:00 committed by GitHub
parent 13c1a1c746
commit 45340835a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 74 additions and 188 deletions

View File

@ -70,15 +70,6 @@ const (
// DropPartitionDDType name of DD type for drop partition
DropPartitionDDType = "DropPartition"
// CreateAliasDDType name of DD type for create collection alias
CreateAliasDDType = "CreateAlias"
// DropAliasDDType name of DD type for drop collection alias
DropAliasDDType = "DropAlias"
// AlterAliasDDType name of DD type for alter collection alias
AlterAliasDDType = "AlterAlias"
)
// MetaTable store all rootcoord meta info
@ -1225,8 +1216,7 @@ func (mt *MetaTable) dupMeta() (
}
// AddAlias add collection alias
func (mt *MetaTable) AddAlias(collectionAlias string, collectionName string,
ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
func (mt *MetaTable) AddAlias(collectionAlias string, collectionName string, ts typeutil.Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
if _, ok := mt.collAlias2ID[collectionAlias]; ok {
@ -1243,30 +1233,24 @@ func (mt *MetaTable) AddAlias(collectionAlias string, collectionName string,
}
mt.collAlias2ID[collectionAlias] = id
meta := make(map[string]string)
addition := mt.getAdditionKV(ddOpStr, meta)
saveAlias := func(ts typeutil.Timestamp) (string, string, error) {
k1 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias)
v1, err := proto.Marshal(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}})
if err != nil {
log.Error("MetaTable AddAlias saveAlias Marshal CollectionInfo fail",
zap.String("key", k1), zap.Error(err))
return "", "", fmt.Errorf("MetaTable AddAlias saveAlias Marshal CollectionInfo fail key:%s, err:%w", k1, err)
}
meta[k1] = string(v1)
return k1, string(v1), nil
k := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias)
v, err := proto.Marshal(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}})
if err != nil {
log.Error("MetaTable AddAlias Marshal CollectionInfo fail",
zap.String("key", k), zap.Error(err))
return fmt.Errorf("MetaTable AddAlias Marshal CollectionInfo fail key:%s, err:%w", k, err)
}
err := mt.client.MultiSave(meta, ts, addition, saveAlias)
err = mt.client.Save(k, string(v), ts)
if err != nil {
log.Error("SnapShotKV MultiSave fail", zap.Error(err))
panic("SnapShotKV MultiSave fail")
log.Error("SnapShotKV Save fail", zap.Error(err))
panic("SnapShotKV Save fail")
}
return nil
}
// DeleteAlias delete collection alias
func (mt *MetaTable) DeleteAlias(collectionAlias string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
// DropAlias drop collection alias
func (mt *MetaTable) DropAlias(collectionAlias string, ts typeutil.Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
if _, ok := mt.collAlias2ID[collectionAlias]; !ok {
@ -1278,17 +1262,16 @@ func (mt *MetaTable) DeleteAlias(collectionAlias string, ts typeutil.Timestamp,
fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias),
}
meta := make(map[string]string)
addition := mt.getAdditionKV(ddOpStr, meta)
err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetakeys, ts, addition)
err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetakeys, ts)
if err != nil {
log.Error("SnapShotKV MultiSave fail", zap.Error(err))
panic("SnapShotKV MultiSave fail")
log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
}
return nil
}
// AlterAlias alter collection alias
func (mt *MetaTable) AlterAlias(collectionAlias string, collectionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
func (mt *MetaTable) AlterAlias(collectionAlias string, collectionName string, ts typeutil.Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
if _, ok := mt.collAlias2ID[collectionAlias]; !ok {
@ -1300,24 +1283,19 @@ func (mt *MetaTable) AlterAlias(collectionAlias string, collectionName string, t
return fmt.Errorf("aliased collection name does not exist, name = %s", collectionName)
}
mt.collAlias2ID[collectionAlias] = id
meta := make(map[string]string)
addition := mt.getAdditionKV(ddOpStr, meta)
alterAlias := func(ts typeutil.Timestamp) (string, string, error) {
k1 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias)
v1, err := proto.Marshal(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}})
if err != nil {
log.Error("MetaTable AlterAlias alterAlias Marshal CollectionInfo fail",
zap.String("key", k1), zap.Error(err))
return "", "", fmt.Errorf("MetaTable AlterAlias alterAlias Marshal CollectionInfo fail key:%s, err:%w", k1, err)
}
meta[k1] = string(v1)
return k1, string(v1), nil
k := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias)
v, err := proto.Marshal(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}})
if err != nil {
log.Error("MetaTable AlterAlias Marshal CollectionInfo fail",
zap.String("key", k), zap.Error(err))
return fmt.Errorf("MetaTable AlterAlias Marshal CollectionInfo fail key:%s, err:%w", k, err)
}
err := mt.client.MultiSave(meta, ts, addition, alterAlias)
err = mt.client.Save(k, string(v), ts)
if err != nil {
log.Error("SnapShotKV MultiSave fail", zap.Error(err))
panic("SnapShotKV MultiSave fail")
log.Error("SnapShotKV Save fail", zap.Error(err))
panic("SnapShotKV Save fail")
}
return nil
}

View File

@ -149,20 +149,23 @@ func Test_MockKV(t *testing.T) {
func TestMetaTable(t *testing.T) {
const (
collID = typeutil.UniqueID(1)
collName = "testColl"
collIDInvalid = typeutil.UniqueID(2)
partIDDefault = typeutil.UniqueID(10)
partID = typeutil.UniqueID(20)
partName = "testPart"
partIDInvalid = typeutil.UniqueID(21)
segID = typeutil.UniqueID(100)
segID2 = typeutil.UniqueID(101)
fieldID = typeutil.UniqueID(110)
fieldID2 = typeutil.UniqueID(111)
indexID = typeutil.UniqueID(10000)
indexID2 = typeutil.UniqueID(10001)
buildID = typeutil.UniqueID(201)
collName = "testColl"
collNameInvalid = "testColl_invalid"
aliasName1 = "alias1"
aliasName2 = "alias2"
collID = typeutil.UniqueID(1)
collIDInvalid = typeutil.UniqueID(2)
partIDDefault = typeutil.UniqueID(10)
partID = typeutil.UniqueID(20)
partName = "testPart"
partIDInvalid = typeutil.UniqueID(21)
segID = typeutil.UniqueID(100)
segID2 = typeutil.UniqueID(101)
fieldID = typeutil.UniqueID(110)
fieldID2 = typeutil.UniqueID(111)
indexID = typeutil.UniqueID(10000)
indexID2 = typeutil.UniqueID(10001)
buildID = typeutil.UniqueID(201)
)
rand.Seed(time.Now().UnixNano())
@ -188,9 +191,8 @@ func TestMetaTable(t *testing.T) {
collInfo := &pb.CollectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Name: "testColl",
Description: "",
AutoID: false,
Name: collName,
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
FieldID: fieldID,
@ -262,7 +264,7 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, uint64(1), ts)
collMeta, err := mt.GetCollectionByName("testColl", 0)
collMeta, err := mt.GetCollectionByName(collName, 0)
assert.Nil(t, err)
assert.Equal(t, collMeta.CreateTime, ts)
assert.Equal(t, collMeta.PartitionCreatedTimestamps[0], ts)
@ -271,7 +273,7 @@ func TestMetaTable(t *testing.T) {
assert.Equal(t, 1, len(collMeta.PartitionIDs))
assert.True(t, mt.HasCollection(collInfo.ID, 0))
field, err := mt.GetFieldSchema("testColl", "field110")
field, err := mt.GetFieldSchema(collName, "field110")
assert.Nil(t, err)
assert.Equal(t, collInfo.Schema.Fields[0].FieldID, field.FieldID)
@ -283,25 +285,27 @@ func TestMetaTable(t *testing.T) {
t.Run("add alias", func(t *testing.T) {
ts := ftso()
exists := mt.IsAlias("alias1")
exists := mt.IsAlias(aliasName1)
assert.False(t, exists)
err = mt.AddAlias("alias1", "testColl", ts, ddOp)
err = mt.AddAlias(aliasName1, collName, ts)
assert.Nil(t, err)
aliases := mt.ListAliases(collID)
assert.Equal(t, aliases, []string{"alias1"})
exists = mt.IsAlias("alias1")
assert.Equal(t, aliases, []string{aliasName1})
exists = mt.IsAlias(aliasName1)
assert.True(t, exists)
})
t.Run("alter alias", func(t *testing.T) {
ts := ftso()
err = mt.AlterAlias("alias1", "testColl", ts, ddOp)
err = mt.AlterAlias(aliasName1, collName, ts)
assert.Nil(t, err)
err = mt.AlterAlias(aliasName1, collNameInvalid, ts)
assert.NotNil(t, err)
})
t.Run("delete alias", func(t *testing.T) {
ts := ftso()
err = mt.DeleteAlias("alias1", ts, ddOp)
err = mt.DropAlias(aliasName1, ts)
assert.Nil(t, err)
})
@ -375,7 +379,7 @@ func TestMetaTable(t *testing.T) {
_, _, err := mt.GetNotIndexedSegments("collTest", "field110", idxInfo, nil, 0)
assert.NotNil(t, err)
seg, field, err := mt.GetNotIndexedSegments("testColl", "field110", idxInfo, []typeutil.UniqueID{segID, segID2}, 0)
seg, field, err := mt.GetNotIndexedSegments(collName, "field110", idxInfo, []typeutil.UniqueID{segID, segID2}, 0)
assert.Nil(t, err)
assert.Equal(t, 1, len(seg))
assert.Equal(t, segID2, seg[0])
@ -391,7 +395,7 @@ func TestMetaTable(t *testing.T) {
idxInfo.IndexID = 2001
idxInfo.IndexName = "field110-1"
seg, field, err = mt.GetNotIndexedSegments("testColl", "field110", idxInfo, []typeutil.UniqueID{segID, segID2}, 0)
seg, field, err = mt.GetNotIndexedSegments(collName, "field110", idxInfo, []typeutil.UniqueID{segID, segID2}, 0)
assert.Nil(t, err)
assert.Equal(t, 2, len(seg))
assert.Equal(t, segID, seg[0])
@ -401,7 +405,7 @@ func TestMetaTable(t *testing.T) {
})
t.Run("get index by name", func(t *testing.T) {
_, idx, err := mt.GetIndexByName("testColl", "field110")
_, idx, err := mt.GetIndexByName(collName, "field110")
assert.Nil(t, err)
assert.Equal(t, 1, len(idx))
assert.Equal(t, indexID, idx[0].IndexID)
@ -417,7 +421,7 @@ func TestMetaTable(t *testing.T) {
}
assert.True(t, EqualKeyPairArray(idx[0].IndexParams, params))
_, idx, err = mt.GetIndexByName("testColl", "idx201")
_, idx, err = mt.GetIndexByName(collName, "idx201")
assert.Nil(t, err)
assert.Zero(t, len(idx))
})
@ -439,20 +443,20 @@ func TestMetaTable(t *testing.T) {
})
t.Run("drop index", func(t *testing.T) {
idx, ok, err := mt.DropIndex("testColl", "field110", "field110", 0)
idx, ok, err := mt.DropIndex(collName, "field110", "field110", 0)
assert.Nil(t, err)
assert.True(t, ok)
assert.Equal(t, indexID, idx)
_, ok, err = mt.DropIndex("testColl", "field110", "field110-error", 0)
_, ok, err = mt.DropIndex(collName, "field110", "field110-error", 0)
assert.Nil(t, err)
assert.False(t, ok)
_, idxs, err := mt.GetIndexByName("testColl", "field110")
_, idxs, err := mt.GetIndexByName(collName, "field110")
assert.Nil(t, err)
assert.Zero(t, len(idxs))
_, idxs, err = mt.GetIndexByName("testColl", "field110-1")
_, idxs, err = mt.GetIndexByName(collName, "field110-1")
assert.Nil(t, err)
assert.Equal(t, len(idxs), 1)
assert.Equal(t, idxs[0].IndexID, int64(2001))
@ -478,12 +482,12 @@ func TestMetaTable(t *testing.T) {
err = mt.DeleteCollection(collIDInvalid, ts, nil)
assert.NotNil(t, err)
ts2 := ftso()
err = mt.AddAlias("alias1", "testColl", ts2, ddOp)
err = mt.AddAlias(aliasName2, collName, ts2)
assert.Nil(t, err)
err = mt.DeleteCollection(collID, ts, nil)
assert.Nil(t, err)
ts3 := ftso()
err = mt.DeleteAlias("alias1", ts3, ddOp)
err = mt.DropAlias(aliasName2, ts3)
assert.NotNil(t, err)
// check DD operation flag

View File

@ -976,48 +976,16 @@ func (t *CreateAliasReqTask) Execute(ctx context.Context) error {
return fmt.Errorf("create alias, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
ddReq := internalpb.CreateAliasRequest{
Base: t.Req.Base,
CollectionName: t.Req.CollectionName,
Alias: t.Req.Alias,
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOp := func(ts typeutil.Timestamp) (string, error) {
ddReq.Base.Timestamp = ts
return EncodeDdOperation(&ddReq, CreateAliasDDType)
}
reason := fmt.Sprintf("create alias %s", t.Req.Alias)
ts, err := t.core.TSOAllocator(1)
if err != nil {
return fmt.Errorf("TSO alloc fail, error = %w", err)
}
// use lambda function here to guarantee all resources to be released
createAliasFn := func() error {
// lock for ddl operation
t.core.ddlLock.Lock()
defer t.core.ddlLock.Unlock()
t.core.chanTimeTick.AddDdlTimeTick(ts, reason)
// clear ddl timetick in all conditions
defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
err = t.core.MetaTable.AddAlias(t.Req.Alias, t.Req.CollectionName, ts, ddOp)
if err != nil {
return err
}
return t.core.SendTimeTick(ts, reason)
}
err = createAliasFn()
err = t.core.MetaTable.AddAlias(t.Req.Alias, t.Req.CollectionName, ts)
if err != nil {
return err
return fmt.Errorf("meta table add alias failed, error = %w", err)
}
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
return nil
}
// DropAliasReqTask drop alias request task
@ -1037,44 +1005,13 @@ func (t *DropAliasReqTask) Execute(ctx context.Context) error {
return fmt.Errorf("create alias, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
ddReq := internalpb.DropAliasRequest{
Base: t.Req.Base,
Alias: t.Req.Alias,
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOp := func(ts typeutil.Timestamp) (string, error) {
ddReq.Base.Timestamp = ts
return EncodeDdOperation(&ddReq, DropAliasDDType)
}
reason := fmt.Sprintf("create alias %s", t.Req.Alias)
ts, err := t.core.TSOAllocator(1)
if err != nil {
return fmt.Errorf("TSO alloc fail, error = %w", err)
}
// use lambda function here to guarantee all resources to be released
dropAliasFn := func() error {
// lock for ddl operation
t.core.ddlLock.Lock()
defer t.core.ddlLock.Unlock()
t.core.chanTimeTick.AddDdlTimeTick(ts, reason)
// clear ddl timetick in all conditions
defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
err = t.core.MetaTable.DeleteAlias(t.Req.Alias, ts, ddOp)
if err != nil {
return err
}
t.core.SendTimeTick(ts, reason)
return nil
}
err = dropAliasFn()
err = t.core.MetaTable.DropAlias(t.Req.Alias, ts)
if err != nil {
return err
return fmt.Errorf("meta table drop alias failed, error = %w", err)
}
req := proxypb.InvalidateCollMetaCacheRequest{
@ -1089,8 +1026,7 @@ func (t *DropAliasReqTask) Execute(ctx context.Context) error {
// error doesn't matter here
t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
return nil
}
// AlterAliasReqTask alter alias request task
@ -1110,44 +1046,13 @@ func (t *AlterAliasReqTask) Execute(ctx context.Context) error {
return fmt.Errorf("alter alias, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
ddReq := internalpb.DropAliasRequest{
Base: t.Req.Base,
Alias: t.Req.Alias,
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOp := func(ts typeutil.Timestamp) (string, error) {
ddReq.Base.Timestamp = ts
return EncodeDdOperation(&ddReq, AlterAliasDDType)
}
reason := fmt.Sprintf("alter alias %s", t.Req.Alias)
ts, err := t.core.TSOAllocator(1)
if err != nil {
return fmt.Errorf("TSO alloc fail, error = %w", err)
}
// use lambda function here to guarantee all resources to be released
alterAliasFn := func() error {
// lock for ddl operation
t.core.ddlLock.Lock()
defer t.core.ddlLock.Unlock()
t.core.chanTimeTick.AddDdlTimeTick(ts, reason)
// clear ddl timetick in all conditions
defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
err = t.core.MetaTable.AlterAlias(t.Req.Alias, t.Req.CollectionName, ts, ddOp)
if err != nil {
return err
}
t.core.SendTimeTick(ts, reason)
return nil
}
err = alterAliasFn()
err = t.core.MetaTable.AlterAlias(t.Req.Alias, t.Req.CollectionName, ts)
if err != nil {
return err
return fmt.Errorf("meta table alter alias failed, error = %w", err)
}
req := proxypb.InvalidateCollMetaCacheRequest{
@ -1162,6 +1067,5 @@ func (t *AlterAliasReqTask) Execute(ctx context.Context) error {
// error doesn't matter here
t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
return nil
}