Set CollectionInfo.CreateTime to the timestamp when save the metadata into etcd, (#6763)

not the timestamp when RootCoord receive the CreateCollection Request

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>
pull/6773/head
neza2017 2021-07-23 14:36:12 +08:00 committed by GitHub
parent b2f549d6a7
commit 5adb69161f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 157 additions and 67 deletions

View File

@ -38,7 +38,7 @@ type TxnKV interface {
type SnapShotKV interface {
Save(key, value string) (typeutil.Timestamp, error)
Load(key string, ts typeutil.Timestamp) (string, error)
MultiSave(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
MultiSave(kvs map[string]string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error)
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
}

View File

@ -313,7 +313,7 @@ func (ms *metaSnapshot) Load(key string, ts typeutil.Timestamp) (string, error)
return string(resp.Kvs[0].Value), nil
}
func (ms *metaSnapshot) MultiSave(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
func (ms *metaSnapshot) MultiSave(kvs map[string]string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
@ -325,7 +325,10 @@ func (ms *metaSnapshot) MultiSave(kvs map[string]string, addition func(ts typeut
for key, value := range kvs {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value))
}
if addition != nil {
for _, addition := range additions {
if addition == nil {
continue
}
if k, v, e := addition(ts); e == nil {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, k), v))
}
@ -375,7 +378,7 @@ func (ms *metaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]str
return keys, values, nil
}
func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
@ -387,7 +390,10 @@ func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, re
for key, value := range saves {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value))
}
if addition != nil {
for _, addition := range additions {
if addition == nil {
continue
}
if k, v, e := addition(ts); e == nil {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, k), v))
}

View File

@ -278,7 +278,7 @@ func TestMultiSave(t *testing.T) {
for i := 0; i < 20; i++ {
saves := map[string]string{"k1": fmt.Sprintf("v1-%d", i), "k2": fmt.Sprintf("v2-%d", i)}
vtso = typeutil.Timestamp(100 + i*5)
ts, err := ms.MultiSave(saves, nil)
ts, err := ms.MultiSave(saves)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
@ -348,7 +348,7 @@ func TestMultiSaveAndRemoveWithPrefix(t *testing.T) {
sm := map[string]string{"ks": fmt.Sprintf("value-%d", i)}
dm := []string{fmt.Sprintf("kd-%04d", i-20)}
vtso = typeutil.Timestamp(100 + i*5)
ts, err := ms.MultiSaveAndRemoveWithPrefix(sm, dm, nil)
ts, err := ms.MultiSaveAndRemoveWithPrefix(sm, dm)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}

View File

@ -227,7 +227,7 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, idx []*pb.IndexInfo,
if len(coll.PartitionIDs) != len(coll.PartitionNames) ||
len(coll.PartitionIDs) != len(coll.PartitionCreatedTimestamps) ||
len(coll.PartitionNames) != len(coll.PartitionCreatedTimestamps) {
(len(coll.PartitionIDs) != 1 && len(coll.PartitionIDs) != 0) {
return 0, fmt.Errorf("PartitionIDs, PartitionNames and PartitionCreatedTimestmaps' length mis-match when creating collection")
}
if _, ok := mt.collName2ID[coll.Schema.Name]; ok {
@ -237,15 +237,11 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, idx []*pb.IndexInfo,
return 0, fmt.Errorf("incorrect index id when creating collection")
}
mt.collID2Meta[coll.ID] = *coll
mt.collName2ID[coll.Schema.Name] = coll.ID
for _, i := range idx {
mt.indexID2Meta[i.IndexID] = *i
}
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.ID)
v1 := proto.MarshalTextString(coll)
meta := map[string]string{k1: v1}
meta := make(map[string]string)
for _, i := range idx {
k := fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, coll.ID, i.IndexID)
@ -255,7 +251,20 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, idx []*pb.IndexInfo,
// save ddOpStr into etcd
addition := mt.getAdditionKV(ddOpStr, meta)
ts, err := mt.client.MultiSave(meta, addition)
saveColl := func(ts typeutil.Timestamp) (string, string, error) {
coll.CreateTime = ts
if len(coll.PartitionCreatedTimestamps) == 1 {
coll.PartitionCreatedTimestamps[0] = ts
}
mt.collID2Meta[coll.ID] = *coll
mt.collName2ID[coll.Schema.Name] = coll.ID
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.ID)
v1 := proto.MarshalTextString(coll)
meta[k1] = v1
return k1, v1, nil
}
ts, err := mt.client.MultiSave(meta, addition, saveColl)
if err != nil {
_ = mt.reloadFromKV()
return 0, err
@ -441,7 +450,7 @@ func (mt *metaTable) ListCollectionPhysicalChannels() []string {
return plist
}
func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, createdTimestamp uint64, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) {
func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
coll, ok := mt.collID2Meta[collID]
@ -475,19 +484,25 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string
}
// no necessary to check created timestamp
}
coll.PartitionIDs = append(coll.PartitionIDs, partitionID)
coll.PartitionNames = append(coll.PartitionNames, partitionName)
coll.PartitionCreatedTimestamps = append(coll.PartitionCreatedTimestamps, createdTimestamp)
mt.collID2Meta[collID] = coll
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
v1 := proto.MarshalTextString(&coll)
meta := map[string]string{k1: v1}
meta := make(map[string]string)
// save ddOpStr into etcd
addition := mt.getAdditionKV(ddOpStr, meta)
ts, err := mt.client.MultiSave(meta, addition)
saveColl := func(ts typeutil.Timestamp) (string, string, error) {
coll.PartitionIDs = append(coll.PartitionIDs, partitionID)
coll.PartitionNames = append(coll.PartitionNames, partitionName)
coll.PartitionCreatedTimestamps = append(coll.PartitionCreatedTimestamps, ts)
mt.collID2Meta[collID] = coll
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
v1 := proto.MarshalTextString(&coll)
meta[k1] = v1
return k1, v1, nil
}
ts, err := mt.client.MultiSave(meta, addition, saveColl)
if err != nil {
_ = mt.reloadFromKV()
return 0, err
@ -755,7 +770,7 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.
fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collMeta.ID, dropIdxID),
}
ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta, nil)
ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta)
if err != nil {
_ = mt.reloadFromKV()
return 0, 0, false, err
@ -929,7 +944,7 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id
meta[k] = v
}
_, err = mt.client.MultiSave(meta, nil)
_, err = mt.client.MultiSave(meta)
if err != nil {
_ = mt.reloadFromKV()
return nil, schemapb.FieldSchema{}, err
@ -952,7 +967,7 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id
meta[k] = v
}
_, err = mt.client.MultiSave(meta, nil)
_, err = mt.client.MultiSave(meta)
if err != nil {
_ = mt.reloadFromKV()
return nil, schemapb.FieldSchema{}, err

View File

@ -31,8 +31,8 @@ type mockTestKV struct {
loadWithPrefix func(key string, ts typeutil.Timestamp) ([]string, []string, error)
save func(key, value string) (typeutil.Timestamp, error)
multiSave func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
multiSave func(kvs map[string]string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
}
func (m *mockTestKV) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
@ -46,12 +46,12 @@ func (m *mockTestKV) Save(key, value string) (typeutil.Timestamp, error) {
return m.save(key, value)
}
func (m *mockTestKV) MultiSave(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return m.multiSave(kvs, addition)
func (m *mockTestKV) MultiSave(kvs map[string]string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return m.multiSave(kvs, additions...)
}
func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return m.multiSaveAndRemoveWithPrefix(saves, removals, addition)
func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return m.multiSaveAndRemoveWithPrefix(saves, removals, additions...)
}
func Test_MockKV(t *testing.T) {
@ -243,11 +243,15 @@ func TestMetaTable(t *testing.T) {
_, err = mt.AddCollection(collInfo, nil, ddOp)
assert.NotNil(t, err)
_, err = mt.AddCollection(collInfo, idxInfo, ddOp)
ts, err := mt.AddCollection(collInfo, idxInfo, ddOp)
assert.Nil(t, err)
assert.Equal(t, ts, uint64(1))
collMeta, err := mt.GetCollectionByName("testColl", 0)
assert.Nil(t, err)
assert.Equal(t, ts, collMeta.CreateTime)
assert.Equal(t, ts, collMeta.PartitionCreatedTimestamps[0])
assert.Equal(t, partIDDefault, collMeta.PartitionIDs[0])
assert.Equal(t, 1, len(collMeta.PartitionIDs))
assert.True(t, mt.HasCollection(collInfo.ID, 0))
@ -263,8 +267,15 @@ func TestMetaTable(t *testing.T) {
})
t.Run("add partition", func(t *testing.T) {
_, err := mt.AddPartition(collID, partName, partID, ftso(), ddOp)
ts, err := mt.AddPartition(collID, partName, partID, ddOp)
assert.Nil(t, err)
assert.Equal(t, ts, uint64(2))
collMeta, ok := mt.collID2Meta[collID]
assert.True(t, ok)
assert.Equal(t, 2, len(collMeta.PartitionNames))
assert.Equal(t, collMeta.PartitionNames[1], partName)
assert.Equal(t, ts, collMeta.PartitionCreatedTimestamps[1])
// check DD operation flag
flag, err := mt.client.Load(DDMsgSendPrefix, 0)
@ -441,7 +452,7 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save error")
}
collInfo.PartitionIDs = nil
@ -453,14 +464,21 @@ func TestMetaTable(t *testing.T) {
})
t.Run("delete collection failed", func(t *testing.T) {
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ts := ftso()
for _, a := range addition {
if a != nil {
a(ts)
}
}
return 0, nil
}
mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save and remove with prefix error")
}
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
_, err := mt.AddCollection(collInfo, idxInfo, nil)
assert.Nil(t, err)
mt.indexID2Meta = make(map[int64]pb.IndexInfo)
@ -476,6 +494,7 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
_, err := mt.AddCollection(collInfo, idxInfo, nil)
assert.Nil(t, err)
@ -498,17 +517,18 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
_, err = mt.AddCollection(collInfo, idxInfo, nil)
assert.Nil(t, err)
_, err = mt.AddPartition(2, "no-part", 22, ftso(), nil)
_, err = mt.AddPartition(2, "no-part", 22, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, "can't find collection. id = 2")
coll := mt.collID2Meta[collInfo.ID]
coll.PartitionIDs = make([]int64, Params.MaxPartitionNum)
mt.collID2Meta[coll.ID] = coll
_, err = mt.AddPartition(coll.ID, "no-part", 22, ftso(), nil)
_, err = mt.AddPartition(coll.ID, "no-part", 22, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("maximum partition's number should be limit to %d", Params.MaxPartitionNum))
@ -516,26 +536,34 @@ func TestMetaTable(t *testing.T) {
coll.PartitionNames = []string{partName}
coll.PartitionCreatedTimestamps = []uint64{ftso()}
mt.collID2Meta[coll.ID] = coll
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save error")
}
_, err = mt.AddPartition(coll.ID, "no-part", 22, ftso(), nil)
_, err = mt.AddPartition(coll.ID, "no-part", 22, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, "multi save error")
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ts := ftso()
for _, a := range addition {
if a != nil {
a(ts)
}
}
return 0, nil
}
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
_, err = mt.AddCollection(collInfo, idxInfo, nil)
assert.Nil(t, err)
_, err = mt.AddPartition(coll.ID, partName, partID, ftso(), nil)
_, err = mt.AddPartition(coll.ID, partName, partID, nil)
assert.Nil(t, err)
_, err = mt.AddPartition(coll.ID, partName, 22, ftso(), nil)
_, err = mt.AddPartition(coll.ID, partName, 22, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("partition name = %s already exists", partName))
_, err = mt.AddPartition(coll.ID, "no-part", partID, ftso(), nil)
_, err = mt.AddPartition(coll.ID, "no-part", partID, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("partition id = %d already exists", partID))
})
@ -544,7 +572,7 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, nil
}
err := mt.reloadFromKV()
@ -552,6 +580,7 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
_, err = mt.AddCollection(collInfo, idxInfo, nil)
assert.Nil(t, err)
@ -565,7 +594,13 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ts := ftso()
for _, a := range addition {
if a != nil {
a(ts)
}
}
return 0, nil
}
err := mt.reloadFromKV()
@ -585,7 +620,7 @@ func TestMetaTable(t *testing.T) {
assert.NotNil(t, err)
assert.EqualError(t, err, "partition abc does not exist")
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save and remove with prefix error")
}
_, _, err = mt.DeletePartition(collInfo.ID, partName, nil)
@ -602,7 +637,13 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ts := ftso()
for _, a := range addition {
if a != nil {
a(ts)
}
}
return 0, nil
}
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
@ -656,7 +697,13 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ts := ftso()
for _, a := range addition {
if a != nil {
a(ts)
}
}
return 0, nil
}
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
@ -709,7 +756,7 @@ func TestMetaTable(t *testing.T) {
coll.PartitionCreatedTimestamps = nil
_, err = mt.AddCollection(collInfo, idxInfo, nil)
assert.Nil(t, err)
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save and remove with prefix error")
}
_, _, _, err = mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName)
@ -721,7 +768,13 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ts := ftso()
for _, a := range addition {
if a != nil {
a(ts)
}
}
return 0, nil
}
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
@ -769,7 +822,7 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, nil
}
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
@ -840,7 +893,13 @@ func TestMetaTable(t *testing.T) {
assert.NotNil(t, err)
assert.EqualError(t, err, "collection abc not found")
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ts := ftso()
for _, a := range addition {
if a != nil {
a(ts)
}
}
return 0, nil
}
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
@ -866,14 +925,20 @@ func TestMetaTable(t *testing.T) {
assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", idxInfo[0].IndexID))
mt.indexID2Meta = bakMeta
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save error")
}
_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, "multi save error")
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ts := ftso()
for _, a := range addition {
if a != nil {
a(ts)
}
}
return 0, nil
}
collInfo.PartitionIDs = nil
@ -881,8 +946,10 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionCreatedTimestamps = nil
_, err = mt.AddCollection(collInfo, idxInfo, nil)
assert.Nil(t, err)
coll := mt.collID2Meta[collInfo.ID]
coll, ok := mt.collID2Meta[collInfo.ID]
assert.True(t, ok)
coll.FieldIndexes = append(coll.FieldIndexes, &pb.FieldIndexInfo{FiledID: coll.FieldIndexes[0].FiledID, IndexID: coll.FieldIndexes[0].IndexID + 1})
mt.collID2Meta[coll.ID] = coll
anotherIdx := pb.IndexInfo{
IndexName: "no-index",
@ -897,7 +964,7 @@ func TestMetaTable(t *testing.T) {
mt.indexID2Meta[anotherIdx.IndexID] = anotherIdx
idx.IndexName = idxInfo[0].IndexName
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save error")
}
_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil)
@ -917,7 +984,13 @@ func TestMetaTable(t *testing.T) {
assert.NotNil(t, err)
assert.EqualError(t, err, "collection abc not found")
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ts := ftso()
for _, a := range addition {
if a != nil {
a(ts)
}
}
return 0, nil
}
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {

View File

@ -123,7 +123,6 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
if err != nil {
return fmt.Errorf("alloc collection id error = %w", err)
}
collTs := t.Req.Base.Timestamp
partID, _, err := t.core.IDAllocator(1)
if err != nil {
return fmt.Errorf("alloc partition id error = %w", err)
@ -132,7 +131,6 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
log.Debug("collection name -> id",
zap.String("collection name", t.Req.CollectionName),
zap.Int64("collection_id", collID),
zap.Uint64("collection created ts", collTs),
zap.Int64("default partition id", partID))
vchanNames := make([]string, t.Req.ShardsNum)
@ -145,13 +143,12 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
collInfo := etcdpb.CollectionInfo{
ID: collID,
Schema: &schema,
CreateTime: collTs,
PartitionIDs: []typeutil.UniqueID{partID},
PartitionNames: []string{Params.DefaultPartitionName},
FieldIndexes: make([]*etcdpb.FieldIndexInfo, 0, 16),
VirtualChannelNames: vchanNames,
PhysicalChannelNames: chanNames,
PartitionCreatedTimestamps: []uint64{collTs},
PartitionCreatedTimestamps: []uint64{0},
}
idxInfo := make([]*etcdpb.IndexInfo, 0, 16)
@ -418,8 +415,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
return EncodeDdOperation(&ddReq, CreatePartitionDDType)
}
createdTimestamp := t.Req.Base.Timestamp
ts, err := t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, createdTimestamp, ddOp)
ts, err := t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ddOp)
if err != nil {
return err
}