Simplify meta_snapshot interface (#9778)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/9797/head
Cai Yudong 2021-10-13 15:54:33 +08:00 committed by GitHub
parent a63ef91c74
commit c7566345eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 162 additions and 273 deletions

View File

@ -65,7 +65,7 @@ type MetaKv interface {
type SnapShotKV interface {
Save(key string, value string, ts typeutil.Timestamp) error
Load(key string, ts typeutil.Timestamp) (string, error)
MultiSave(kvs map[string]string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error
MultiSave(kvs map[string]string, ts typeutil.Timestamp) error
LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error)
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error
}

View File

@ -322,26 +322,18 @@ func (ms *metaSnapshot) Load(key string, ts typeutil.Timestamp) (string, error)
return string(resp.Kvs[0].Value), nil
}
func (ms *metaSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error {
func (ms *metaSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
ops := make([]clientv3.Op, 0, len(kvs)+2)
ops := make([]clientv3.Op, 0, len(kvs)+1)
for key, value := range kvs {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value))
}
strTs := strconv.FormatInt(int64(ts), 10)
for _, addition := range additions {
if addition == nil {
continue
}
if k, v, e := addition(ts); e == nil {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, k), v))
}
}
ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs))
resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit()
if err != nil {
@ -350,6 +342,7 @@ func (ms *metaSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp,
ms.putTs(resp.Header.Revision, ts)
return nil
}
func (ms *metaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
ms.lock.RLock()
defer ms.lock.RUnlock()
@ -387,26 +380,18 @@ func (ms *metaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]str
return keys, values, nil
}
func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error {
func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
ops := make([]clientv3.Op, 0, len(saves)+len(removals)+2)
ops := make([]clientv3.Op, 0, len(saves)+len(removals)+1)
for key, value := range saves {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value))
}
strTs := strconv.FormatInt(int64(ts), 10)
for _, addition := range additions {
if addition == nil {
continue
}
if k, v, e := addition(ts); e == nil {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, k), v))
}
}
for _, key := range removals {
ops = append(ops, clientv3.OpDelete(path.Join(ms.root, key), clientv3.WithPrefix()))
}

View File

@ -220,20 +220,6 @@ func (mt *MetaTable) reloadFromKV() error {
return nil
}
func (mt *MetaTable) getAdditionKV(op func(ts typeutil.Timestamp) (string, error), meta map[string]string) func(ts typeutil.Timestamp) (string, string, error) {
if op == nil {
return nil
}
meta[DDMsgSendPrefix] = "false"
return func(ts typeutil.Timestamp) (string, string, error) {
val, err := op(ts)
if err != nil {
return "", "", err
}
return DDOperationPrefix, val, nil
}
}
// AddTenant add tenant
func (mt *MetaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error {
mt.tenantLock.Lock()
@ -277,7 +263,7 @@ func (mt *MetaTable) AddProxy(po *pb.ProxyMeta, ts typeutil.Timestamp) error {
}
// AddCollection add collection
func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestamp, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestamp, idx []*pb.IndexInfo, ddOpStr string) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -293,11 +279,24 @@ func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestam
return fmt.Errorf("incorrect index id when creating collection")
}
coll.CreateTime = ts
if len(coll.PartitionCreatedTimestamps) == 1 {
coll.PartitionCreatedTimestamps[0] = ts
}
mt.collID2Meta[coll.ID] = *coll
mt.collName2ID[coll.Schema.Name] = coll.ID
for _, i := range idx {
mt.indexID2Meta[i.IndexID] = *i
}
meta := make(map[string]string)
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.ID)
v1, err := proto.Marshal(coll)
if err != nil {
log.Error("MetaTable AddCollection saveColl Marshal fail",
zap.String("key", k1), zap.Error(err))
return fmt.Errorf("MetaTable AddCollection Marshal fail key:%s, err:%w", k1, err)
}
meta := map[string]string{k1: string(v1)}
for _, i := range idx {
k := fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, coll.ID, i.IndexID)
@ -311,26 +310,10 @@ func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestam
}
// save ddOpStr into etcd
addition := mt.getAdditionKV(ddOpStr, meta)
saveColl := func(ts typeutil.Timestamp) (string, string, error) {
coll.CreateTime = ts
if len(coll.PartitionCreatedTimestamps) == 1 {
coll.PartitionCreatedTimestamps[0] = ts
}
mt.collID2Meta[coll.ID] = *coll
mt.collName2ID[coll.Schema.Name] = coll.ID
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.ID)
v1, err := proto.Marshal(coll)
if err != nil {
log.Error("MetaTable AddCollection saveColl Marshal fail",
zap.String("key", k1), zap.Error(err))
return "", "", fmt.Errorf("MetaTable AddCollection saveColl Marshal fail key:%s, err:%w", k1, err)
}
meta[k1] = string(v1)
return k1, string(v1), nil
}
meta[DDMsgSendPrefix] = "false"
meta[DDOperationPrefix] = ddOpStr
err := mt.client.MultiSave(meta, ts, addition, saveColl)
err = mt.client.MultiSave(meta, ts)
if err != nil {
log.Error("SnapShotKV MultiSave fail", zap.Error(err))
panic("SnapShotKV MultiSave fail")
@ -340,7 +323,7 @@ func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestam
}
// DeleteCollection delete collection
func (mt *MetaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
func (mt *MetaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr string) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -396,9 +379,12 @@ func (mt *MetaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Time
}
// save ddOpStr into etcd
var saveMeta = map[string]string{}
addition := mt.getAdditionKV(ddOpStr, saveMeta)
err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys, ts, addition)
var saveMeta = map[string]string{
DDMsgSendPrefix: "false",
DDOperationPrefix: ddOpStr,
}
err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys, ts)
if err != nil {
log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
@ -551,7 +537,7 @@ func (mt *MetaTable) ListCollectionPhysicalChannels() []string {
}
// AddPartition add partition
func (mt *MetaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
func (mt *MetaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr string) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
coll, ok := mt.collID2Meta[collID]
@ -585,30 +571,26 @@ func (mt *MetaTable) AddPartition(collID typeutil.UniqueID, partitionName string
}
// no necessary to check created timestamp
}
meta := make(map[string]string)
coll.PartitionIDs = append(coll.PartitionIDs, partitionID)
coll.PartitionNames = append(coll.PartitionNames, partitionName)
coll.PartitionCreatedTimestamps = append(coll.PartitionCreatedTimestamps, ts)
mt.collID2Meta[collID] = coll
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
v1, err := proto.Marshal(&coll)
if err != nil {
log.Error("MetaTable AddPartition saveColl Marshal fail",
zap.String("key", k1), zap.Error(err))
return fmt.Errorf("MetaTable AddPartition Marshal fail, k1:%s, err:%w", k1, err)
}
meta := map[string]string{k1: string(v1)}
// save ddOpStr into etcd
addition := mt.getAdditionKV(ddOpStr, meta)
meta[DDMsgSendPrefix] = "false"
meta[DDOperationPrefix] = ddOpStr
saveColl := func(ts typeutil.Timestamp) (string, string, error) {
coll.PartitionIDs = append(coll.PartitionIDs, partitionID)
coll.PartitionNames = append(coll.PartitionNames, partitionName)
coll.PartitionCreatedTimestamps = append(coll.PartitionCreatedTimestamps, ts)
mt.collID2Meta[collID] = coll
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
v1, err := proto.Marshal(&coll)
if err != nil {
log.Error("MetaTable AddPartition saveColl Marshal fail",
zap.String("key", k1), zap.Error(err))
return "", "", fmt.Errorf("MetaTable AddPartition saveColl Marshal fail, k1:%s, err:%w", k1, err)
}
meta[k1] = string(v1)
return k1, string(v1), nil
}
err := mt.client.MultiSave(meta, ts, addition, saveColl)
err = mt.client.MultiSave(meta, ts)
if err != nil {
log.Error("SnapShotKV MultiSave fail", zap.Error(err))
panic("SnapShotKV MultiSave fail")
@ -697,7 +679,7 @@ func (mt *MetaTable) HasPartition(collID typeutil.UniqueID, partitionName string
}
// DeletePartition delete partition
func (mt *MetaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.UniqueID, error) {
func (mt *MetaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp, ddOpStr string) (typeutil.UniqueID, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -758,9 +740,10 @@ func (mt *MetaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
}
// save ddOpStr into etcd
addition := mt.getAdditionKV(ddOpStr, meta)
meta[DDMsgSendPrefix] = "false"
meta[DDOperationPrefix] = ddOpStr
err = mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys, ts, addition)
err = mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys, ts)
if err != nil {
log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")

View File

@ -31,8 +31,8 @@ type mockTestKV struct {
loadWithPrefix func(key string, ts typeutil.Timestamp) ([]string, []string, error)
save func(key, value string, ts typeutil.Timestamp) error
multiSave func(kvs map[string]string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error
multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error
multiSave func(kvs map[string]string, ts typeutil.Timestamp) error
multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string, ts typeutil.Timestamp) error
}
func (m *mockTestKV) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
@ -46,12 +46,12 @@ func (m *mockTestKV) Save(key, value string, ts typeutil.Timestamp) error {
return m.save(key, value, ts)
}
func (m *mockTestKV) MultiSave(kvs map[string]string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error {
return m.multiSave(kvs, ts, additions...)
func (m *mockTestKV) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error {
return m.multiSave(kvs, ts)
}
func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error {
return m.multiSaveAndRemoveWithPrefix(saves, removals, ts, additions...)
func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return m.multiSaveAndRemoveWithPrefix(saves, removals, ts)
}
func Test_MockKV(t *testing.T) {
@ -251,16 +251,12 @@ func TestMetaTable(t *testing.T) {
},
}
ddOp := func(ts typeutil.Timestamp) (string, error) {
return "", nil
}
t.Run("add collection", func(t *testing.T) {
ts := ftso()
err = mt.AddCollection(collInfo, ts, nil, ddOp)
err = mt.AddCollection(collInfo, ts, nil, "")
assert.NotNil(t, err)
err = mt.AddCollection(collInfo, ts, idxInfo, ddOp)
err = mt.AddCollection(collInfo, ts, idxInfo, "")
assert.Nil(t, err)
assert.Equal(t, uint64(1), ts)
@ -311,7 +307,7 @@ func TestMetaTable(t *testing.T) {
t.Run("add partition", func(t *testing.T) {
ts := ftso()
err = mt.AddPartition(collID, partName, partID, ts, ddOp)
err = mt.AddPartition(collID, partName, partID, ts, "")
assert.Nil(t, err)
//assert.Equal(t, ts, uint64(2))
@ -467,7 +463,7 @@ func TestMetaTable(t *testing.T) {
t.Run("drop partition", func(t *testing.T) {
ts := ftso()
id, err := mt.DeletePartition(collID, partName, ts, nil)
id, err := mt.DeletePartition(collID, partName, ts, "")
assert.Nil(t, err)
assert.Equal(t, partID, id)
@ -479,12 +475,12 @@ func TestMetaTable(t *testing.T) {
t.Run("drop collection", func(t *testing.T) {
ts := ftso()
err = mt.DeleteCollection(collIDInvalid, ts, nil)
err = mt.DeleteCollection(collIDInvalid, ts, "")
assert.NotNil(t, err)
ts2 := ftso()
err = mt.AddAlias(aliasName2, collName, ts2)
assert.Nil(t, err)
err = mt.DeleteCollection(collID, ts, nil)
err = mt.DeleteCollection(collID, ts, "")
assert.Nil(t, err)
ts3 := ftso()
err = mt.DropAlias(aliasName2, ts3)
@ -504,39 +500,27 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
return fmt.Errorf("multi save error")
}
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
assert.Panics(t, func() { mt.AddCollection(collInfo, 0, idxInfo, nil) })
assert.Panics(t, func() { mt.AddCollection(collInfo, 0, idxInfo, "") })
//err = mt.AddCollection(collInfo, 0, idxInfo, nil)
//assert.NotNil(t, err)
//assert.EqualError(t, err, "multi save error")
})
t.Run("delete collection failed", func(t *testing.T) {
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
for _, a := range addition {
if a != nil {
a(ts)
}
}
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
return nil
}
mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string, ts typeutil.Timestamp) error {
return fmt.Errorf("multi save and remove with prefix error")
}
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
assert.Nil(t, err)
mt.indexID2Meta = make(map[int64]pb.IndexInfo)
ts = ftso()
assert.Panics(t, func() { mt.DeleteCollection(collInfo.ID, ts, nil) })
assert.Panics(t, func() { mt.DeleteCollection(collInfo.ID, ts, "") })
//err = mt.DeleteCollection(collInfo.ID, ts, nil)
//assert.NotNil(t, err)
//assert.EqualError(t, err, "multi save and remove with prefix error")
@ -551,7 +535,7 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
err = mt.AddCollection(collInfo, ts, idxInfo, "")
assert.Nil(t, err)
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
@ -575,18 +559,18 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
err = mt.AddCollection(collInfo, ts, idxInfo, "")
assert.Nil(t, err)
ts = ftso()
err = mt.AddPartition(2, "no-part", 22, ts, nil)
err = mt.AddPartition(2, "no-part", 22, ts, "")
assert.NotNil(t, err)
assert.EqualError(t, err, "can't find collection. id = 2")
coll := mt.collID2Meta[collInfo.ID]
coll.PartitionIDs = make([]int64, Params.MaxPartitionNum)
mt.collID2Meta[coll.ID] = coll
err = mt.AddPartition(coll.ID, "no-part", 22, ts, nil)
err = mt.AddPartition(coll.ID, "no-part", 22, ts, "")
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("maximum partition's number should be limit to %d", Params.MaxPartitionNum))
@ -594,20 +578,15 @@ func TestMetaTable(t *testing.T) {
coll.PartitionNames = []string{partName}
coll.PartitionCreatedTimestamps = []uint64{ftso()}
mt.collID2Meta[coll.ID] = coll
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
return fmt.Errorf("multi save error")
}
assert.Panics(t, func() { mt.AddPartition(coll.ID, "no-part", 22, ts, nil) })
assert.Panics(t, func() { mt.AddPartition(coll.ID, "no-part", 22, ts, "") })
//err = mt.AddPartition(coll.ID, "no-part", 22, ts, nil)
//assert.NotNil(t, err)
//assert.EqualError(t, err, "multi save error")
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
for _, a := range addition {
if a != nil {
a(ts)
}
}
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
return nil
}
collInfo.PartitionIDs = nil
@ -619,10 +598,10 @@ func TestMetaTable(t *testing.T) {
//_, err = mt.AddPartition(coll.ID, partName, partID, nil)
//assert.Nil(t, err)
ts = ftso()
err = mt.AddPartition(coll.ID, partName, 22, ts, nil)
err = mt.AddPartition(coll.ID, partName, 22, ts, "")
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("partition name = %s already exists", partName))
err = mt.AddPartition(coll.ID, "no-part", partID, ts, nil)
err = mt.AddPartition(coll.ID, "no-part", partID, ts, "")
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("partition id = %d already exists", partID))
})
@ -631,7 +610,7 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
return nil
}
err := mt.reloadFromKV()
@ -641,7 +620,7 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
err = mt.AddCollection(collInfo, ts, idxInfo, "")
assert.Nil(t, err)
assert.False(t, mt.HasPartition(collInfo.ID, "no-partName", 0))
@ -654,12 +633,7 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
for _, a := range addition {
if a != nil {
a(ts)
}
}
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
return nil
}
err := mt.reloadFromKV()
@ -669,28 +643,28 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionNames = []string{partName}
collInfo.PartitionCreatedTimestamps = []uint64{ftso()}
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
err = mt.AddCollection(collInfo, ts, idxInfo, "")
assert.Nil(t, err)
ts = ftso()
_, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName, ts, nil)
_, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName, ts, "")
assert.NotNil(t, err)
assert.EqualError(t, err, "default partition cannot be deleted")
_, err = mt.DeletePartition(collInfo.ID, "abc", ts, nil)
_, err = mt.DeletePartition(collInfo.ID, "abc", ts, "")
assert.NotNil(t, err)
assert.EqualError(t, err, "partition abc does not exist")
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return fmt.Errorf("multi save and remove with prefix error")
}
assert.Panics(t, func() { mt.DeletePartition(collInfo.ID, partName, ts, nil) })
assert.Panics(t, func() { mt.DeletePartition(collInfo.ID, partName, ts, "") })
//_, err = mt.DeletePartition(collInfo.ID, partName, ts, nil)
//assert.NotNil(t, err)
//assert.EqualError(t, err, "multi save and remove with prefix error")
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
_, err = mt.DeletePartition(collInfo.ID, "abc", ts, nil)
_, err = mt.DeletePartition(collInfo.ID, "abc", ts, "")
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("can't find collection id = %d", collInfo.ID))
})
@ -699,12 +673,7 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
for _, a := range addition {
if a != nil {
a(ts)
}
}
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
return nil
}
mockKV.save = func(key, value string, ts typeutil.Timestamp) error {
@ -717,7 +686,7 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
err = mt.AddCollection(collInfo, ts, idxInfo, "")
assert.Nil(t, err)
segIdxInfo := pb.SegmentIndexInfo{
@ -745,7 +714,7 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
ts = ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
err = mt.AddCollection(collInfo, ts, idxInfo, "")
assert.Nil(t, err)
segIdxInfo.IndexID = indexID
@ -763,12 +732,7 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
for _, a := range addition {
if a != nil {
a(ts)
}
}
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
return nil
}
mockKV.save = func(key string, value string, ts typeutil.Timestamp) error {
@ -781,7 +745,7 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
err = mt.AddCollection(collInfo, ts, idxInfo, "")
assert.Nil(t, err)
ts = ftso()
@ -823,9 +787,9 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionNames = nil
coll.PartitionCreatedTimestamps = nil
ts = ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
err = mt.AddCollection(collInfo, ts, idxInfo, "")
assert.Nil(t, err)
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return fmt.Errorf("multi save and remove with prefix error")
}
ts = ftso()
@ -839,12 +803,7 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
for _, a := range addition {
if a != nil {
a(ts)
}
}
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
return nil
}
mockKV.save = func(key, value string, ts typeutil.Timestamp) error {
@ -857,7 +816,7 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
err = mt.AddCollection(collInfo, ts, idxInfo, "")
assert.Nil(t, err)
seg, err := mt.GetSegmentIndexInfoByID(segID2, fieldID, "abc")
@ -894,7 +853,7 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
return nil
}
mockKV.save = func(key string, value string, ts typeutil.Timestamp) error {
@ -907,7 +866,7 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
err = mt.AddCollection(collInfo, ts, idxInfo, "")
assert.Nil(t, err)
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
@ -966,12 +925,7 @@ func TestMetaTable(t *testing.T) {
assert.NotNil(t, err)
assert.EqualError(t, err, "collection abc not found")
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
for _, a := range addition {
if a != nil {
a(ts)
}
}
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
return nil
}
mockKV.save = func(key string, value string, ts typeutil.Timestamp) error {
@ -984,7 +938,7 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
err = mt.AddCollection(collInfo, ts, idxInfo, "")
assert.Nil(t, err)
_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, "no-field", idx, nil, 0)
@ -998,7 +952,7 @@ func TestMetaTable(t *testing.T) {
assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", idxInfo[0].IndexID))
mt.indexID2Meta = bakMeta
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
return fmt.Errorf("multi save error")
}
assert.Panics(t, func() { mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil, 0) })
@ -1006,12 +960,7 @@ func TestMetaTable(t *testing.T) {
//assert.NotNil(t, err)
//assert.EqualError(t, err, "multi save error")
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
for _, a := range addition {
if a != nil {
a(ts)
}
}
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
return nil
}
collInfo.PartitionIDs = nil
@ -1037,7 +986,7 @@ func TestMetaTable(t *testing.T) {
mt.indexID2Meta[anotherIdx.IndexID] = anotherIdx
idx.IndexName = idxInfo[0].IndexName
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
return fmt.Errorf("multi save error")
}
assert.Panics(t, func() { mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil, 0) })
@ -1058,12 +1007,7 @@ func TestMetaTable(t *testing.T) {
assert.NotNil(t, err)
assert.EqualError(t, err, "collection abc not found")
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
for _, a := range addition {
if a != nil {
a(ts)
}
}
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
return nil
}
mockKV.save = func(key string, value string, ts typeutil.Timestamp) error {
@ -1076,7 +1020,7 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
err = mt.AddCollection(collInfo, ts, idxInfo, "")
assert.Nil(t, err)
mt.indexID2Meta = make(map[int64]pb.IndexInfo)
_, _, err = mt.GetIndexByName(collInfo.Schema.Name, idxInfo[0].IndexName)
@ -1133,7 +1077,7 @@ func TestMetaWithTimestamp(t *testing.T) {
collInfo.PartitionNames = []string{partName1}
collInfo.PartitionCreatedTimestamps = []uint64{ftso()}
t1 := ftso()
err = mt.AddCollection(collInfo, t1, nil, nil)
err = mt.AddCollection(collInfo, t1, nil, "")
assert.Nil(t, err)
collInfo.ID = 2
@ -1143,7 +1087,7 @@ func TestMetaWithTimestamp(t *testing.T) {
collInfo.Schema.Name = collName2
t2 := ftso()
err = mt.AddCollection(collInfo, t2, nil, nil)
err = mt.AddCollection(collInfo, t2, nil, "")
assert.Nil(t, err)
assert.True(t, mt.HasCollection(collID1, 0))

View File

@ -362,19 +362,20 @@ func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32
PhysicalChannelNames: chanNames,
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOp := func(ts typeutil.Timestamp) (string, error) {
ddCollReq.Base.Timestamp = ts
return EncodeDdOperation(&ddCollReq, CreateCollectionDDType)
}
reason := fmt.Sprintf("create collection %d", collID)
ts, err := core.TSOAllocator(1)
if err != nil {
return fmt.Errorf("TSO alloc fail, error = %w", err)
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddCollReq.Base.Timestamp = ts
ddOpStr, err := EncodeDdOperation(&ddCollReq, CreateCollectionDDType)
if err != nil {
return fmt.Errorf("EncodeDdOperation fail, error = %w", err)
}
// use lambda function here to guarantee all resources to be released
createCollectionFn := func() error {
// lock for ddl operation
@ -385,7 +386,7 @@ func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32
// clear ddl timetick in all conditions
defer core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
err = core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOp)
err = core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOpStr)
if err != nil {
return fmt.Errorf("meta table add collection failed,error = %w", err)
}

View File

@ -345,18 +345,9 @@ func (ss *suffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error
// MultiSave save muiltple kvs
// if ts == 0, act like TxnKV
// additions is executed before process each kvs
// each key-value (including additions result) will be treat in same logic like Save
func (ss *suffixSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error {
// apply additions
for _, addition := range additions {
k, v, err := addition(ts)
if err != nil {
continue
}
kvs[k] = v
}
// if ts == 0, act like TxnKV, with additions executed
// each key-value will be treat in same logic like Save
func (ss *suffixSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error {
// if ts == 0, act like TxnKV
if ts == 0 {
return ss.TxnKV.MultiSave(kvs)
}
@ -487,19 +478,9 @@ func (ss *suffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]s
// MultiSaveAndRemoveWithPrefix save muiltple kvs and remove as well
// if ts == 0, act like TxnKV
// additions is executed before process each kvs
// each key-value (including additions result) will be treat in same logic like Save
func (ss *suffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error {
// apply additions
for _, addition := range additions {
k, v, err := addition(ts)
if err != nil {
continue
}
saves[k] = v
}
// if ts == 0, act like TxnKV, with additions executed
// each key-value will be treat in same logic like Save
func (ss *suffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
// if ts == 0, act like TxnKV
if ts == 0 {
return ss.TxnKV.MultiSaveAndRemoveWithPrefix(saves, removals)
}

View File

@ -1,7 +1,6 @@
package rootcoord
import (
"errors"
"fmt"
"math/rand"
"testing"
@ -314,11 +313,7 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) {
saves := map[string]string{"k1": fmt.Sprintf("v1-%d", i), "k2": fmt.Sprintf("v2-%d", i)}
vtso = typeutil.Timestamp(100 + i*5)
ts := ftso()
err = ss.MultiSave(saves, ts, func(ts typeutil.Timestamp) (string, string, error) {
return "extra", "extra-value", nil
}, func(ts typeutil.Timestamp) (string, string, error) {
return "extra", "extra-value", errors.New("out of range")
})
err = ss.MultiSave(saves, ts)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
@ -400,11 +395,7 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) {
dm := []string{fmt.Sprintf("kd-%04d", i-20)}
vtso = typeutil.Timestamp(100 + i*5)
ts := ftso()
err = ss.MultiSaveAndRemoveWithPrefix(sm, dm, ts, func(ts typeutil.Timestamp) (string, string, error) {
return "extra", "extra-value", nil
}, func(ts typeutil.Timestamp) (string, string, error) {
return "extra", "extra-value", errors.New("out of range")
})
err = ss.MultiSaveAndRemoveWithPrefix(sm, dm, ts)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}

View File

@ -174,19 +174,20 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
PhysicalChannelNames: chanNames,
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOp := func(ts typeutil.Timestamp) (string, error) {
ddCollReq.Base.Timestamp = ts
return EncodeDdOperation(&ddCollReq, CreateCollectionDDType)
}
reason := fmt.Sprintf("create collection %d", collID)
ts, err := t.core.TSOAllocator(1)
if err != nil {
return fmt.Errorf("TSO alloc fail, error = %w", err)
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddCollReq.Base.Timestamp = ts
ddOpStr, err := EncodeDdOperation(&ddCollReq, CreateCollectionDDType)
if err != nil {
return fmt.Errorf("EncodeDdOperation fail, error = %w", err)
}
// use lambda function here to guarantee all resources to be released
createCollectionFn := func() error {
// lock for ddl operation
@ -210,7 +211,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
Data: ids[pchan],
})
}
err = t.core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOp)
err = t.core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOpStr)
if err != nil {
t.core.dmlChannels.RemoveProducerChannels(chanNames...)
// it's ok just to leave create collection message sent, datanode and querynode does't process CreateCollection logic
@ -264,19 +265,20 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
CollectionID: collMeta.ID,
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOp := func(ts typeutil.Timestamp) (string, error) {
ddReq.Base.Timestamp = ts
return EncodeDdOperation(&ddReq, DropCollectionDDType)
}
reason := fmt.Sprintf("drop collection %d", collMeta.ID)
ts, err := t.core.TSOAllocator(1)
if err != nil {
return fmt.Errorf("TSO alloc fail, error = %w", err)
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddReq.Base.Timestamp = ts
ddOpStr, err := EncodeDdOperation(&ddReq, DropCollectionDDType)
if err != nil {
return fmt.Errorf("EncodeDdOperation fail, error = %w", err)
}
aliases := t.core.MetaTable.ListAliases(collMeta.ID)
// use lambda function here to guarantee all resources to be released
@ -289,7 +291,7 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
// clear ddl timetick in all conditions
defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
err = t.core.MetaTable.DeleteCollection(collMeta.ID, ts, ddOp)
err = t.core.MetaTable.DeleteCollection(collMeta.ID, ts, ddOpStr)
if err != nil {
return err
}
@ -493,19 +495,20 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
PartitionID: partID,
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOp := func(ts typeutil.Timestamp) (string, error) {
ddReq.Base.Timestamp = ts
return EncodeDdOperation(&ddReq, CreatePartitionDDType)
}
reason := fmt.Sprintf("create partition %s", t.Req.PartitionName)
ts, err := t.core.TSOAllocator(1)
if err != nil {
return fmt.Errorf("TSO alloc fail, error = %w", err)
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddReq.Base.Timestamp = ts
ddOpStr, err := EncodeDdOperation(&ddReq, CreatePartitionDDType)
if err != nil {
return fmt.Errorf("EncodeDdOperation fail, error = %w", err)
}
// use lambda function here to guarantee all resources to be released
createPartitionFn := func() error {
// lock for ddl operation
@ -516,7 +519,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
// clear ddl timetick in all conditions
defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ts, ddOp)
err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ts, ddOpStr)
if err != nil {
return err
}
@ -588,19 +591,20 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
PartitionID: partID,
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOp := func(ts typeutil.Timestamp) (string, error) {
ddReq.Base.Timestamp = ts
return EncodeDdOperation(&ddReq, DropPartitionDDType)
}
reason := fmt.Sprintf("drop partition %s", t.Req.PartitionName)
ts, err := t.core.TSOAllocator(1)
if err != nil {
return fmt.Errorf("TSO alloc fail, error = %w", err)
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddReq.Base.Timestamp = ts
ddOpStr, err := EncodeDdOperation(&ddReq, DropPartitionDDType)
if err != nil {
return fmt.Errorf("EncodeDdOperation fail, error = %w", err)
}
// use lambda function here to guarantee all resources to be released
dropPartitionFn := func() error {
// lock for ddl operation
@ -611,7 +615,7 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
// clear ddl timetick in all conditions
defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
_, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ts, ddOp)
_, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ts, ddOpStr)
if err != nil {
return err
}