Fix timestamp go back issue for timeticksync (#7123)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/7152/head
Cai Yudong 2021-08-18 14:36:10 +08:00 committed by GitHub
parent b4eb20f2c0
commit 305fa9c17e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 514 additions and 317 deletions

View File

@ -167,7 +167,7 @@ func TestGrpcService(t *testing.T) {
timeTickArray := make([]typeutil.Timestamp, 0, 16)
timeTickLock := sync.Mutex{}
core.SendTimeTick = func(ts typeutil.Timestamp) error {
core.SendTimeTick = func(ts typeutil.Timestamp, reason string) error {
timeTickLock.Lock()
defer timeTickLock.Unlock()
t.Logf("send time tick %d", ts)

View File

@ -36,9 +36,9 @@ type TxnKV interface {
}
type SnapShotKV interface {
Save(key, value string) (typeutil.Timestamp, error)
Save(key string, value string, ts typeutil.Timestamp) error
Load(key string, ts typeutil.Timestamp) (string, error)
MultiSave(kvs map[string]string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
MultiSave(kvs map[string]string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error
LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error)
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error
}

View File

@ -146,7 +146,7 @@ func (d *dmlChannels) RemoveProducerChannels(names ...string) {
defer d.lock.Unlock()
for _, name := range names {
log.Debug("delete dml channel", zap.String("channel name", name))
//log.Debug("delete dml channel", zap.String("channel name", name))
if ds, ok := d.dml[name]; ok {
ds.valid = false
}

View File

@ -35,11 +35,10 @@ type rtPair struct {
}
type metaSnapshot struct {
cli *clientv3.Client
root string
tsKey string
lock sync.RWMutex
timeAllactor func() typeutil.Timestamp
cli *clientv3.Client
root string
tsKey string
lock sync.RWMutex
ts2Rev []rtPair
minPos int
@ -47,20 +46,19 @@ type metaSnapshot struct {
numTs int
}
func newMetaSnapshot(cli *clientv3.Client, root, tsKey string, bufSize int, timeAllactor func() typeutil.Timestamp) (*metaSnapshot, error) {
func newMetaSnapshot(cli *clientv3.Client, root, tsKey string, bufSize int) (*metaSnapshot, error) {
if bufSize <= 0 {
bufSize = 1024
}
ms := &metaSnapshot{
cli: cli,
root: root,
tsKey: tsKey,
lock: sync.RWMutex{},
timeAllactor: timeAllactor,
ts2Rev: make([]rtPair, bufSize),
minPos: 0,
maxPos: 0,
numTs: 0,
cli: cli,
root: root,
tsKey: tsKey,
lock: sync.RWMutex{},
ts2Rev: make([]rtPair, bufSize),
minPos: 0,
maxPos: 0,
numTs: 0,
}
if err := ms.loadTs(); err != nil {
return nil, err
@ -263,24 +261,23 @@ func (ms *metaSnapshot) getRev(ts typeutil.Timestamp) (int64, error) {
return 0, fmt.Errorf("can't find revision on ts=%d", ts)
}
func (ms *metaSnapshot) Save(key, value string) (typeutil.Timestamp, error) {
func (ms *metaSnapshot) Save(key, value string, ts typeutil.Timestamp) error {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
ts := ms.timeAllactor()
strTs := strconv.FormatInt(int64(ts), 10)
resp, err := ms.cli.Txn(ctx).If().Then(
clientv3.OpPut(path.Join(ms.root, key), value),
clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs),
).Commit()
if err != nil {
return 0, err
return err
}
ms.putTs(resp.Header.Revision, ts)
return ts, nil
return nil
}
func (ms *metaSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) {
@ -313,18 +310,18 @@ func (ms *metaSnapshot) Load(key string, ts typeutil.Timestamp) (string, error)
return string(resp.Kvs[0].Value), nil
}
func (ms *metaSnapshot) MultiSave(kvs map[string]string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
func (ms *metaSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
ts := ms.timeAllactor()
strTs := strconv.FormatInt(int64(ts), 10)
ops := make([]clientv3.Op, 0, len(kvs)+2)
for key, value := range kvs {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value))
}
strTs := strconv.FormatInt(int64(ts), 10)
for _, addition := range additions {
if addition == nil {
continue
@ -336,10 +333,10 @@ func (ms *metaSnapshot) MultiSave(kvs map[string]string, additions ...func(ts ty
ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs))
resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit()
if err != nil {
return 0, err
return err
}
ms.putTs(resp.Header.Revision, ts)
return ts, nil
return nil
}
func (ms *metaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
ms.lock.RLock()
@ -378,18 +375,18 @@ func (ms *metaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]str
return keys, values, nil
}
func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
ts := ms.timeAllactor()
strTs := strconv.FormatInt(int64(ts), 10)
ops := make([]clientv3.Op, 0, len(saves)+len(removals)+2)
for key, value := range saves {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value))
}
strTs := strconv.FormatInt(int64(ts), 10)
for _, addition := range additions {
if addition == nil {
continue
@ -404,8 +401,8 @@ func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, re
ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs))
resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit()
if err != nil {
return 0, err
return err
}
ms.putTs(resp.Header.Revision, ts)
return ts, nil
return nil
}

View File

@ -41,20 +41,21 @@ func TestMetaSnapshot(t *testing.T) {
return vtso
}
ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 4, ftso)
ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 4)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 8; i++ {
vtso = typeutil.Timestamp(100 + i)
ts, err := ms.Save("abc", fmt.Sprintf("value-%d", i))
ts := ftso()
err = ms.Save("abc", fmt.Sprintf("value-%d", i), ts)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
_, err = etcdCli.Put(context.Background(), "other", fmt.Sprintf("other-%d", i))
assert.Nil(t, err)
}
ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 4, ftso)
ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 4)
assert.Nil(t, err)
assert.NotNil(t, ms)
}
@ -224,13 +225,14 @@ func TestLoad(t *testing.T) {
return vtso
}
ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7, ftso)
ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
vtso = typeutil.Timestamp(100 + i*5)
ts, err := ms.Save("key", fmt.Sprintf("value-%d", i))
ts := ftso()
err = ms.Save("key", fmt.Sprintf("value-%d", i), ts)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
@ -243,7 +245,7 @@ func TestLoad(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, "value-19", val)
ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11, ftso)
ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11)
assert.Nil(t, err)
assert.NotNil(t, ms)
@ -271,14 +273,15 @@ func TestMultiSave(t *testing.T) {
return vtso
}
ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7, ftso)
ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
saves := map[string]string{"k1": fmt.Sprintf("v1-%d", i), "k2": fmt.Sprintf("v2-%d", i)}
vtso = typeutil.Timestamp(100 + i*5)
ts, err := ms.MultiSave(saves)
ts := ftso()
err = ms.MultiSave(saves, ts)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
@ -301,7 +304,7 @@ func TestMultiSave(t *testing.T) {
assert.Equal(t, vals[0], "v1-19")
assert.Equal(t, vals[1], "v2-19")
ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11, ftso)
ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11)
assert.Nil(t, err)
assert.NotNil(t, ms)
@ -334,13 +337,14 @@ func TestMultiSaveAndRemoveWithPrefix(t *testing.T) {
}
defer etcdCli.Close()
ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7, ftso)
ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
vtso = typeutil.Timestamp(100 + i*5)
ts, err := ms.Save(fmt.Sprintf("kd-%04d", i), fmt.Sprintf("value-%d", i))
ts := ftso()
err = ms.Save(fmt.Sprintf("kd-%04d", i), fmt.Sprintf("value-%d", i), ts)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
@ -348,7 +352,8 @@ func TestMultiSaveAndRemoveWithPrefix(t *testing.T) {
sm := map[string]string{"ks": fmt.Sprintf("value-%d", i)}
dm := []string{fmt.Sprintf("kd-%04d", i-20)}
vtso = typeutil.Timestamp(100 + i*5)
ts, err := ms.MultiSaveAndRemoveWithPrefix(sm, dm)
ts := ftso()
err = ms.MultiSaveAndRemoveWithPrefix(sm, dm, ts)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
@ -370,7 +375,7 @@ func TestMultiSaveAndRemoveWithPrefix(t *testing.T) {
assert.Equal(t, 39-i, len(vals))
}
ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11, ftso)
ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11)
assert.Nil(t, err)
assert.NotNil(t, ms)

View File

@ -191,52 +191,52 @@ func (mt *metaTable) getAdditionKV(op func(ts typeutil.Timestamp) (string, error
}
}
func (mt *metaTable) AddTenant(te *pb.TenantMeta) (typeutil.Timestamp, error) {
func (mt *metaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error {
mt.tenantLock.Lock()
defer mt.tenantLock.Unlock()
k := fmt.Sprintf("%s/%d", TenantMetaPrefix, te.ID)
v := proto.MarshalTextString(te)
ts, err := mt.client.Save(k, v)
err := mt.client.Save(k, v, ts)
if err != nil {
log.Error("SnapShotKV Save fail", zap.Error(err))
panic("SnapShotKV Save fail")
}
mt.tenantID2Meta[te.ID] = *te
return ts, nil
return nil
}
func (mt *metaTable) AddProxy(po *pb.ProxyMeta) (typeutil.Timestamp, error) {
func (mt *metaTable) AddProxy(po *pb.ProxyMeta, ts typeutil.Timestamp) error {
mt.proxyLock.Lock()
defer mt.proxyLock.Unlock()
k := fmt.Sprintf("%s/%d", ProxyMetaPrefix, po.ID)
v := proto.MarshalTextString(po)
ts, err := mt.client.Save(k, v)
err := mt.client.Save(k, v, ts)
if err != nil {
log.Error("SnapShotKV Save fail", zap.Error(err))
panic("SnapShotKV Save fail")
}
mt.proxyID2Meta[po.ID] = *po
return ts, nil
return nil
}
func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) {
func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestamp, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
if len(coll.PartitionIDs) != len(coll.PartitionNames) ||
len(coll.PartitionIDs) != len(coll.PartitionCreatedTimestamps) ||
(len(coll.PartitionIDs) != 1 && len(coll.PartitionIDs) != 0) {
return 0, fmt.Errorf("PartitionIDs, PartitionNames and PartitionCreatedTimestmaps' length mis-match when creating collection")
return fmt.Errorf("PartitionIDs, PartitionNames and PartitionCreatedTimestmaps' length mis-match when creating collection")
}
if _, ok := mt.collName2ID[coll.Schema.Name]; ok {
return 0, fmt.Errorf("collection %s exist", coll.Schema.Name)
return fmt.Errorf("collection %s exist", coll.Schema.Name)
}
if len(coll.FieldIndexes) != len(idx) {
return 0, fmt.Errorf("incorrect index id when creating collection")
return fmt.Errorf("incorrect index id when creating collection")
}
for _, i := range idx {
@ -266,22 +266,22 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, idx []*pb.IndexInfo,
return k1, v1, nil
}
ts, err := mt.client.MultiSave(meta, addition, saveColl)
err := mt.client.MultiSave(meta, ts, addition, saveColl)
if err != nil {
log.Error("SnapShotKV MultiSave fail", zap.Error(err))
panic("SnapShotKV MultiSave fail")
}
return ts, nil
return nil
}
func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) {
func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
collMeta, ok := mt.collID2Meta[collID]
if !ok {
return 0, fmt.Errorf("can't find collection. id = %d", collID)
return fmt.Errorf("can't find collection. id = %d", collID)
}
delete(mt.collID2Meta, collID)
@ -319,13 +319,13 @@ func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr func(ts
// save ddOpStr into etcd
var saveMeta = map[string]string{}
addition := mt.getAdditionKV(ddOpStr, saveMeta)
ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys, addition)
err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys, ts, addition)
if err != nil {
log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
}
return ts, nil
return nil
}
func (mt *metaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timestamp) bool {
@ -452,37 +452,37 @@ func (mt *metaTable) ListCollectionPhysicalChannels() []string {
return plist
}
func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) {
func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
coll, ok := mt.collID2Meta[collID]
if !ok {
return 0, fmt.Errorf("can't find collection. id = %d", collID)
return fmt.Errorf("can't find collection. id = %d", collID)
}
// number of partition tags (except _default) should be limited to 4096 by default
if int64(len(coll.PartitionIDs)) >= Params.MaxPartitionNum {
return 0, fmt.Errorf("maximum partition's number should be limit to %d", Params.MaxPartitionNum)
return fmt.Errorf("maximum partition's number should be limit to %d", Params.MaxPartitionNum)
}
if len(coll.PartitionIDs) != len(coll.PartitionNames) {
return 0, fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionNames)=%d", len(coll.PartitionIDs), len(coll.PartitionNames))
return fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionNames)=%d", len(coll.PartitionIDs), len(coll.PartitionNames))
}
if len(coll.PartitionIDs) != len(coll.PartitionCreatedTimestamps) {
return 0, fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionCreatedTimestamps)=%d", len(coll.PartitionIDs), len(coll.PartitionCreatedTimestamps))
return fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionCreatedTimestamps)=%d", len(coll.PartitionIDs), len(coll.PartitionCreatedTimestamps))
}
if len(coll.PartitionNames) != len(coll.PartitionCreatedTimestamps) {
return 0, fmt.Errorf("len(coll.PartitionNames)=%d, len(coll.PartitionCreatedTimestamps)=%d", len(coll.PartitionNames), len(coll.PartitionCreatedTimestamps))
return fmt.Errorf("len(coll.PartitionNames)=%d, len(coll.PartitionCreatedTimestamps)=%d", len(coll.PartitionNames), len(coll.PartitionCreatedTimestamps))
}
for idx := range coll.PartitionIDs {
if coll.PartitionIDs[idx] == partitionID {
return 0, fmt.Errorf("partition id = %d already exists", partitionID)
return fmt.Errorf("partition id = %d already exists", partitionID)
}
if coll.PartitionNames[idx] == partitionName {
return 0, fmt.Errorf("partition name = %s already exists", partitionName)
return fmt.Errorf("partition name = %s already exists", partitionName)
}
// no necessary to check created timestamp
}
@ -504,12 +504,12 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string
return k1, v1, nil
}
ts, err := mt.client.MultiSave(meta, addition, saveColl)
err := mt.client.MultiSave(meta, ts, addition, saveColl)
if err != nil {
log.Error("SnapShotKV MultiSave fail", zap.Error(err))
panic("SnapShotKV MultiSave fail")
}
return ts, nil
return nil
}
func (mt *metaTable) GetPartitionNameByID(collID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) (string, error) {
@ -589,18 +589,17 @@ func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string
return err == nil
}
//return timestamp, partition id, error
func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, typeutil.UniqueID, error) {
func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.UniqueID, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
if partitionName == Params.DefaultPartitionName {
return 0, 0, fmt.Errorf("default partition cannot be deleted")
return 0, fmt.Errorf("default partition cannot be deleted")
}
collMeta, ok := mt.collID2Meta[collID]
if !ok {
return 0, 0, fmt.Errorf("can't find collection id = %d", collID)
return 0, fmt.Errorf("can't find collection id = %d", collID)
}
// check tag exists
@ -621,7 +620,7 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
}
}
if !exist {
return 0, 0, fmt.Errorf("partition %s does not exist", partitionName)
return 0, fmt.Errorf("partition %s does not exist", partitionName)
}
collMeta.PartitionIDs = pd
collMeta.PartitionNames = pn
@ -646,21 +645,21 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
// save ddOpStr into etcd
addition := mt.getAdditionKV(ddOpStr, meta)
ts, err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys, addition)
err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys, ts, addition)
if err != nil {
log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
}
return ts, partID, nil
return partID, nil
}
func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) (typeutil.Timestamp, error) {
func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, ts typeutil.Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
collMeta, ok := mt.collID2Meta[segIdxInfo.CollectionID]
if !ok {
return 0, fmt.Errorf("collection id = %d not found", segIdxInfo.CollectionID)
return fmt.Errorf("collection id = %d not found", segIdxInfo.CollectionID)
}
exist := false
for _, fidx := range collMeta.FieldIndexes {
@ -670,7 +669,7 @@ func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) (typeutil.Timesta
}
}
if !exist {
return 0, fmt.Errorf("index id = %d not found", segIdxInfo.IndexID)
return fmt.Errorf("index id = %d not found", segIdxInfo.IndexID)
}
segIdxMap, ok := mt.segID2IndexMeta[segIdxInfo.SegmentID]
@ -686,9 +685,9 @@ func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) (typeutil.Timesta
if SegmentIndexInfoEqual(segIdxInfo, &tmpInfo) {
if segIdxInfo.BuildID == tmpInfo.BuildID {
log.Debug("Identical SegmentIndexInfo already exist", zap.Int64("IndexID", segIdxInfo.IndexID))
return 0, nil
return nil
}
return 0, fmt.Errorf("index id = %d exist", segIdxInfo.IndexID)
return fmt.Errorf("index id = %d exist", segIdxInfo.IndexID)
}
}
}
@ -699,31 +698,31 @@ func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) (typeutil.Timesta
k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, segIdxInfo.CollectionID, segIdxInfo.IndexID, segIdxInfo.PartitionID, segIdxInfo.SegmentID)
v := proto.MarshalTextString(segIdxInfo)
ts, err := mt.client.Save(k, v)
err := mt.client.Save(k, v, ts)
if err != nil {
log.Error("SnapShotKV Save fail", zap.Error(err))
panic("SnapShotKV Save fail")
}
return ts, nil
return nil
}
//return timestamp, index id, is dropped, error
func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.Timestamp, typeutil.UniqueID, bool, error) {
func (mt *metaTable) DropIndex(collName, fieldName, indexName string, ts typeutil.Timestamp) (typeutil.UniqueID, bool, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
collID, ok := mt.collName2ID[collName]
if !ok {
return 0, 0, false, fmt.Errorf("collection name = %s not exist", collName)
return 0, false, fmt.Errorf("collection name = %s not exist", collName)
}
collMeta, ok := mt.collID2Meta[collID]
if !ok {
return 0, 0, false, fmt.Errorf("collection name = %s not has meta", collName)
return 0, false, fmt.Errorf("collection name = %s not has meta", collName)
}
fieldSch, err := mt.unlockGetFieldSchema(collName, fieldName)
if err != nil {
return 0, 0, false, err
return 0, false, err
}
fieldIdxInfo := make([]*pb.FieldIndexInfo, 0, len(collMeta.FieldIndexes))
var dropIdxID typeutil.UniqueID
@ -748,7 +747,7 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.
}
if len(fieldIdxInfo) == len(collMeta.FieldIndexes) {
log.Warn("drop index,index not found", zap.String("collection name", collName), zap.String("filed name", fieldName), zap.String("index name", indexName))
return 0, 0, false, nil
return 0, false, nil
}
collMeta.FieldIndexes = fieldIdxInfo
mt.collID2Meta[collID] = collMeta
@ -772,13 +771,13 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.
fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collMeta.ID, dropIdxID),
}
ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta)
err = mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta, ts)
if err != nil {
log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
}
return ts, dropIdxID, true, nil
return dropIdxID, true, nil
}
func (mt *metaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error) {
@ -946,12 +945,11 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id
meta[k] = v
}
_, err = mt.client.MultiSave(meta)
err = mt.client.MultiSave(meta, 0)
if err != nil {
log.Error("SnapShotKV MultiSave fail", zap.Error(err))
panic("SnapShotKV MultiSave fail")
}
} else {
idxInfo.IndexID = existInfo.IndexID
if existInfo.IndexName != idxInfo.IndexName { //replace index name
@ -969,7 +967,7 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id
meta[k] = v
}
_, err = mt.client.MultiSave(meta)
err = mt.client.MultiSave(meta, 0)
if err != nil {
log.Error("SnapShotKV MultiSave fail", zap.Error(err))
panic("SnapShotKV MultiSave fail")

View File

@ -30,9 +30,9 @@ type mockTestKV struct {
kv.TxnKV
loadWithPrefix func(key string, ts typeutil.Timestamp) ([]string, []string, error)
save func(key, value string) (typeutil.Timestamp, error)
multiSave func(kvs map[string]string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
save func(key, value string, ts typeutil.Timestamp) error
multiSave func(kvs map[string]string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error
multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error
}
func (m *mockTestKV) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
@ -42,16 +42,16 @@ func (m *mockTestKV) Load(key string, ts typeutil.Timestamp) (string, error) {
return "", nil
}
func (m *mockTestKV) Save(key, value string) (typeutil.Timestamp, error) {
return m.save(key, value)
func (m *mockTestKV) Save(key, value string, ts typeutil.Timestamp) error {
return m.save(key, value, ts)
}
func (m *mockTestKV) MultiSave(kvs map[string]string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return m.multiSave(kvs, additions...)
func (m *mockTestKV) MultiSave(kvs map[string]string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error {
return m.multiSave(kvs, ts, additions...)
}
func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return m.multiSaveAndRemoveWithPrefix(saves, removals, additions...)
func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp, additions ...func(ts typeutil.Timestamp) (string, string, error)) error {
return m.multiSaveAndRemoveWithPrefix(saves, removals, ts, additions...)
}
func Test_MockKV(t *testing.T) {
@ -118,19 +118,19 @@ func Test_MockKV(t *testing.T) {
m1, err := NewMetaTable(k1)
assert.Nil(t, err)
k1.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("save tenant error")
k1.save = func(key string, value string, ts typeutil.Timestamp) error {
return fmt.Errorf("save tenant error")
}
assert.Panics(t, func() { m1.AddTenant(&pb.TenantMeta{}) })
//_, err = m1.AddTenant(&pb.TenantMeta{})
assert.Panics(t, func() { m1.AddTenant(&pb.TenantMeta{}, 0) })
//err = m1.AddTenant(&pb.TenantMeta{}, 0)
//assert.NotNil(t, err)
//assert.EqualError(t, err, "save tenant error")
k1.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("save proxy error")
k1.save = func(key string, value string, ts typeutil.Timestamp) error {
return fmt.Errorf("save proxy error")
}
assert.Panics(t, func() { m1.AddProxy(&pb.ProxyMeta{}) })
//_, err = m1.AddProxy(&pb.ProxyMeta{})
assert.Panics(t, func() { m1.AddProxy(&pb.ProxyMeta{}, 0) })
//err = m1.AddProxy(&pb.ProxyMeta{}, 0)
//assert.NotNil(t, err)
//assert.EqualError(t, err, "save proxy error")
}
@ -167,7 +167,7 @@ func TestMetaTable(t *testing.T) {
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints})
assert.Nil(t, err)
defer etcdCli.Close()
skv, err := newMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7, ftso)
skv, err := newMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7)
assert.Nil(t, err)
assert.NotNil(t, skv)
mt, err := NewMetaTable(skv)
@ -242,17 +242,18 @@ func TestMetaTable(t *testing.T) {
}
t.Run("add collection", func(t *testing.T) {
_, err = mt.AddCollection(collInfo, nil, ddOp)
ts := ftso()
err = mt.AddCollection(collInfo, ts, nil, ddOp)
assert.NotNil(t, err)
ts, err := mt.AddCollection(collInfo, idxInfo, ddOp)
err = mt.AddCollection(collInfo, ts, idxInfo, ddOp)
assert.Nil(t, err)
assert.Equal(t, ts, uint64(1))
assert.Equal(t, uint64(1), ts)
collMeta, err := mt.GetCollectionByName("testColl", 0)
assert.Nil(t, err)
assert.Equal(t, ts, collMeta.CreateTime)
assert.Equal(t, ts, collMeta.PartitionCreatedTimestamps[0])
assert.Equal(t, collMeta.CreateTime, ts)
assert.Equal(t, collMeta.PartitionCreatedTimestamps[0], ts)
assert.Equal(t, partIDDefault, collMeta.PartitionIDs[0])
assert.Equal(t, 1, len(collMeta.PartitionIDs))
@ -269,7 +270,8 @@ func TestMetaTable(t *testing.T) {
})
t.Run("add partition", func(t *testing.T) {
ts, err := mt.AddPartition(collID, partName, partID, ddOp)
ts := ftso()
err = mt.AddPartition(collID, partName, partID, ts, ddOp)
assert.Nil(t, err)
assert.Equal(t, ts, uint64(2))
@ -294,15 +296,15 @@ func TestMetaTable(t *testing.T) {
IndexID: indexID,
BuildID: buildID,
}
_, err := mt.AddIndex(&segIdxInfo)
err = mt.AddIndex(&segIdxInfo, 0)
assert.Nil(t, err)
// it's legal to add index twice
_, err = mt.AddIndex(&segIdxInfo)
err = mt.AddIndex(&segIdxInfo, 0)
assert.Nil(t, err)
segIdxInfo.BuildID = 202
_, err = mt.AddIndex(&segIdxInfo)
err = mt.AddIndex(&segIdxInfo, 0)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("index id = %d exist", segIdxInfo.IndexID))
})
@ -388,12 +390,12 @@ func TestMetaTable(t *testing.T) {
te := pb.TenantMeta{
ID: 100,
}
_, err := mt.AddTenant(&te)
err := mt.AddTenant(&te, 0)
assert.Nil(t, err)
po := pb.ProxyMeta{
ID: 101,
}
_, err = mt.AddProxy(&po)
err = mt.AddProxy(&po, 0)
assert.Nil(t, err)
_, err = NewMetaTable(skv)
@ -401,12 +403,12 @@ func TestMetaTable(t *testing.T) {
})
t.Run("drop index", func(t *testing.T) {
_, idx, ok, err := mt.DropIndex("testColl", "field110", "field110")
idx, ok, err := mt.DropIndex("testColl", "field110", "field110", 0)
assert.Nil(t, err)
assert.True(t, ok)
assert.Equal(t, indexID, idx)
_, _, ok, err = mt.DropIndex("testColl", "field110", "field110-error")
_, ok, err = mt.DropIndex("testColl", "field110", "field110-error", 0)
assert.Nil(t, err)
assert.False(t, ok)
@ -424,7 +426,8 @@ func TestMetaTable(t *testing.T) {
})
t.Run("drop partition", func(t *testing.T) {
_, id, err := mt.DeletePartition(collID, partName, nil)
ts := ftso()
id, err := mt.DeletePartition(collID, partName, ts, nil)
assert.Nil(t, err)
assert.Equal(t, partID, id)
@ -435,9 +438,10 @@ func TestMetaTable(t *testing.T) {
})
t.Run("drop collection", func(t *testing.T) {
_, err = mt.DeleteCollection(collIDInvalid, nil)
ts := ftso()
err = mt.DeleteCollection(collIDInvalid, ts, nil)
assert.NotNil(t, err)
_, err = mt.DeleteCollection(collID, nil)
err = mt.DeleteCollection(collID, ts, nil)
assert.Nil(t, err)
// check DD operation flag
@ -454,52 +458,54 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save error")
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
return fmt.Errorf("multi save error")
}
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
assert.Panics(t, func() { mt.AddCollection(collInfo, idxInfo, nil) })
//_, err := mt.AddCollection(collInfo, idxInfo, nil)
assert.Panics(t, func() { mt.AddCollection(collInfo, 0, idxInfo, nil) })
//err = mt.AddCollection(collInfo, 0, idxInfo, nil)
//assert.NotNil(t, err)
//assert.EqualError(t, err, "multi save error")
})
t.Run("delete collection failed", func(t *testing.T) {
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ts := ftso()
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
for _, a := range addition {
if a != nil {
a(ts)
}
}
return 0, nil
return nil
}
mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save and remove with prefix error")
mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
return fmt.Errorf("multi save and remove with prefix error")
}
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
_, err := mt.AddCollection(collInfo, idxInfo, nil)
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
assert.Nil(t, err)
mt.indexID2Meta = make(map[int64]pb.IndexInfo)
assert.Panics(t, func() { mt.DeleteCollection(collInfo.ID, nil) })
//_, err = mt.DeleteCollection(collInfo.ID, nil)
ts = ftso()
assert.Panics(t, func() { mt.DeleteCollection(collInfo.ID, ts, nil) })
//err = mt.DeleteCollection(collInfo.ID, ts, nil)
//assert.NotNil(t, err)
//assert.EqualError(t, err, "multi save and remove with prefix error")
})
t.Run("get collection failed", func(t *testing.T) {
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, nil
mockKV.save = func(key string, value string, ts typeutil.Timestamp) error {
return nil
}
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
_, err := mt.AddCollection(collInfo, idxInfo, nil)
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
assert.Nil(t, err)
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
@ -510,8 +516,8 @@ func TestMetaTable(t *testing.T) {
})
t.Run("add partition failed", func(t *testing.T) {
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, nil
mockKV.save = func(key string, value string, ts typeutil.Timestamp) error {
return nil
}
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
@ -522,17 +528,19 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
_, err = mt.AddCollection(collInfo, idxInfo, nil)
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
assert.Nil(t, err)
_, err = mt.AddPartition(2, "no-part", 22, nil)
ts = ftso()
err = mt.AddPartition(2, "no-part", 22, ts, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, "can't find collection. id = 2")
coll := mt.collID2Meta[collInfo.ID]
coll.PartitionIDs = make([]int64, Params.MaxPartitionNum)
mt.collID2Meta[coll.ID] = coll
_, err = mt.AddPartition(coll.ID, "no-part", 22, nil)
err = mt.AddPartition(coll.ID, "no-part", 22, ts, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("maximum partition's number should be limit to %d", Params.MaxPartitionNum))
@ -540,22 +548,21 @@ func TestMetaTable(t *testing.T) {
coll.PartitionNames = []string{partName}
coll.PartitionCreatedTimestamps = []uint64{ftso()}
mt.collID2Meta[coll.ID] = coll
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save error")
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
return fmt.Errorf("multi save error")
}
assert.Panics(t, func() { mt.AddPartition(coll.ID, "no-part", 22, nil) })
//_, err = mt.AddPartition(coll.ID, "no-part", 22, nil)
assert.Panics(t, func() { mt.AddPartition(coll.ID, "no-part", 22, ts, nil) })
//err = mt.AddPartition(coll.ID, "no-part", 22, ts, nil)
//assert.NotNil(t, err)
//assert.EqualError(t, err, "multi save error")
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ts := ftso()
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
for _, a := range addition {
if a != nil {
a(ts)
}
}
return 0, nil
return nil
}
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
@ -565,10 +572,11 @@ func TestMetaTable(t *testing.T) {
//assert.Nil(t, err)
//_, err = mt.AddPartition(coll.ID, partName, partID, nil)
//assert.Nil(t, err)
_, err = mt.AddPartition(coll.ID, partName, 22, nil)
ts = ftso()
err = mt.AddPartition(coll.ID, partName, 22, ts, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("partition name = %s already exists", partName))
_, err = mt.AddPartition(coll.ID, "no-part", partID, nil)
err = mt.AddPartition(coll.ID, "no-part", partID, ts, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("partition id = %d already exists", partID))
})
@ -577,8 +585,8 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, nil
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
return nil
}
err := mt.reloadFromKV()
assert.Nil(t, err)
@ -586,7 +594,8 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
_, err = mt.AddCollection(collInfo, idxInfo, nil)
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
assert.Nil(t, err)
assert.False(t, mt.HasPartition(collInfo.ID, "no-partName", 0))
@ -599,14 +608,13 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ts := ftso()
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
for _, a := range addition {
if a != nil {
a(ts)
}
}
return 0, nil
return nil
}
err := mt.reloadFromKV()
assert.Nil(t, err)
@ -614,27 +622,29 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionIDs = []int64{partID}
collInfo.PartitionNames = []string{partName}
collInfo.PartitionCreatedTimestamps = []uint64{ftso()}
_, err = mt.AddCollection(collInfo, idxInfo, nil)
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
assert.Nil(t, err)
_, _, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName, nil)
ts = ftso()
_, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName, ts, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, "default partition cannot be deleted")
_, _, err = mt.DeletePartition(collInfo.ID, "abc", nil)
_, err = mt.DeletePartition(collInfo.ID, "abc", ts, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, "partition abc does not exist")
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save and remove with prefix error")
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
return fmt.Errorf("multi save and remove with prefix error")
}
assert.Panics(t, func() { mt.DeletePartition(collInfo.ID, partName, nil) })
//_, _, err = mt.DeletePartition(collInfo.ID, partName, nil)
assert.Panics(t, func() { mt.DeletePartition(collInfo.ID, partName, ts, nil) })
//_, err = mt.DeletePartition(collInfo.ID, partName, ts, nil)
//assert.NotNil(t, err)
//assert.EqualError(t, err, "multi save and remove with prefix error")
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
_, _, err = mt.DeletePartition(collInfo.ID, "abc", nil)
_, err = mt.DeletePartition(collInfo.ID, "abc", ts, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("can't find collection id = %d", collInfo.ID))
})
@ -643,25 +653,25 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ts := ftso()
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
for _, a := range addition {
if a != nil {
a(ts)
}
}
return 0, nil
return nil
}
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, nil
mockKV.save = func(key, value string, ts typeutil.Timestamp) error {
return nil
}
err := mt.reloadFromKV()
err = mt.reloadFromKV()
assert.Nil(t, err)
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
_, err = mt.AddCollection(collInfo, idxInfo, nil)
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
assert.Nil(t, err)
segIdxInfo := pb.SegmentIndexInfo{
@ -672,12 +682,13 @@ func TestMetaTable(t *testing.T) {
IndexID: indexID2,
BuildID: buildID,
}
_, err = mt.AddIndex(&segIdxInfo)
ts = ftso()
err = mt.AddIndex(&segIdxInfo, ts)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", segIdxInfo.IndexID))
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
_, err = mt.AddIndex(&segIdxInfo)
err = mt.AddIndex(&segIdxInfo, ts)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("collection id = %d not found", collInfo.ID))
@ -687,15 +698,17 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
_, err = mt.AddCollection(collInfo, idxInfo, nil)
ts = ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
assert.Nil(t, err)
segIdxInfo.IndexID = indexID
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("save error")
mockKV.save = func(key string, value string, ts typeutil.Timestamp) error {
return fmt.Errorf("save error")
}
assert.Panics(t, func() { mt.AddIndex(&segIdxInfo) })
//_, err = mt.AddIndex(&segIdxInfo)
ts = ftso()
assert.Panics(t, func() { mt.AddIndex(&segIdxInfo, ts) })
//err = mt.AddIndex(&segIdxInfo, ts)
//assert.NotNil(t, err)
//assert.EqualError(t, err, "save error")
})
@ -704,17 +717,16 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ts := ftso()
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
for _, a := range addition {
if a != nil {
a(ts)
}
}
return 0, nil
return nil
}
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, nil
mockKV.save = func(key string, value string, ts typeutil.Timestamp) error {
return nil
}
err := mt.reloadFromKV()
assert.Nil(t, err)
@ -722,19 +734,21 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
_, err = mt.AddCollection(collInfo, idxInfo, nil)
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
assert.Nil(t, err)
_, _, _, err = mt.DropIndex("abc", "abc", "abc")
ts = ftso()
_, _, err = mt.DropIndex("abc", "abc", "abc", ts)
assert.NotNil(t, err)
assert.EqualError(t, err, "collection name = abc not exist")
mt.collName2ID["abc"] = 2
_, _, _, err = mt.DropIndex("abc", "abc", "abc")
_, _, err = mt.DropIndex("abc", "abc", "abc", ts)
assert.NotNil(t, err)
assert.EqualError(t, err, "collection name = abc not has meta")
_, _, _, err = mt.DropIndex(collInfo.Schema.Name, "abc", "abc")
_, _, err = mt.DropIndex(collInfo.Schema.Name, "abc", "abc", ts)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("collection %s doesn't have filed abc", collInfo.Schema.Name))
@ -751,7 +765,8 @@ func TestMetaTable(t *testing.T) {
}
mt.collID2Meta[coll.ID] = coll
mt.indexID2Meta = make(map[int64]pb.IndexInfo)
_, idxID, isDroped, err := mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName)
ts = ftso()
idxID, isDroped, err := mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName, ts)
assert.Zero(t, idxID)
assert.False(t, isDroped)
assert.Nil(t, err)
@ -761,13 +776,15 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
coll.PartitionCreatedTimestamps = nil
_, err = mt.AddCollection(collInfo, idxInfo, nil)
ts = ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
assert.Nil(t, err)
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save and remove with prefix error")
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
return fmt.Errorf("multi save and remove with prefix error")
}
assert.Panics(t, func() { mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName) })
//_, _, _, err = mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName)
ts = ftso()
assert.Panics(t, func() { mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName, ts) })
//_, _, err = mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName, ts)
//assert.NotNil(t, err)
//assert.EqualError(t, err, "multi save and remove with prefix error")
})
@ -776,17 +793,16 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ts := ftso()
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
for _, a := range addition {
if a != nil {
a(ts)
}
}
return 0, nil
return nil
}
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, nil
mockKV.save = func(key, value string, ts typeutil.Timestamp) error {
return nil
}
err := mt.reloadFromKV()
assert.Nil(t, err)
@ -794,7 +810,8 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
_, err = mt.AddCollection(collInfo, idxInfo, nil)
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
assert.Nil(t, err)
seg, err := mt.GetSegmentIndexInfoByID(segID2, fieldID, "abc")
@ -811,7 +828,8 @@ func TestMetaTable(t *testing.T) {
IndexID: indexID,
BuildID: buildID,
}
_, err = mt.AddIndex(&segIdxInfo)
ts = ftso()
err = mt.AddIndex(&segIdxInfo, ts)
assert.Nil(t, err)
idx, err := mt.GetSegmentIndexInfoByID(segIdxInfo.SegmentID, segIdxInfo.FieldID, idxInfo[0].IndexName)
assert.Nil(t, err)
@ -830,11 +848,11 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, nil
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
return nil
}
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, nil
mockKV.save = func(key string, value string, ts typeutil.Timestamp) error {
return nil
}
err := mt.reloadFromKV()
assert.Nil(t, err)
@ -842,7 +860,8 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
_, err = mt.AddCollection(collInfo, idxInfo, nil)
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
assert.Nil(t, err)
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
@ -901,17 +920,16 @@ func TestMetaTable(t *testing.T) {
assert.NotNil(t, err)
assert.EqualError(t, err, "collection abc not found")
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ts := ftso()
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
for _, a := range addition {
if a != nil {
a(ts)
}
}
return 0, nil
return nil
}
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, nil
mockKV.save = func(key string, value string, ts typeutil.Timestamp) error {
return nil
}
err = mt.reloadFromKV()
assert.Nil(t, err)
@ -919,7 +937,8 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
_, err = mt.AddCollection(collInfo, idxInfo, nil)
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
assert.Nil(t, err)
_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, "no-field", idx, nil)
@ -933,27 +952,26 @@ func TestMetaTable(t *testing.T) {
assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", idxInfo[0].IndexID))
mt.indexID2Meta = bakMeta
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save error")
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
return fmt.Errorf("multi save error")
}
assert.Panics(t, func() { mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil) })
//_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil)
//assert.NotNil(t, err)
//assert.EqualError(t, err, "multi save error")
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ts := ftso()
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
for _, a := range addition {
if a != nil {
a(ts)
}
}
return 0, nil
return nil
}
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
//_, err = mt.AddCollection(collInfo, idxInfo, nil)
//err = mt.AddCollection(collInfo, ts, idxInfo, nil)
//assert.Nil(t, err)
coll, ok := mt.collID2Meta[collInfo.ID]
assert.True(t, ok)
@ -973,8 +991,8 @@ func TestMetaTable(t *testing.T) {
mt.indexID2Meta[anotherIdx.IndexID] = anotherIdx
idx.IndexName = idxInfo[0].IndexName
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save error")
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
return fmt.Errorf("multi save error")
}
assert.Panics(t, func() { mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil) })
//_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil)
@ -994,17 +1012,16 @@ func TestMetaTable(t *testing.T) {
assert.NotNil(t, err)
assert.EqualError(t, err, "collection abc not found")
mockKV.multiSave = func(kvs map[string]string, addition ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ts := ftso()
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp, addition ...func(ts typeutil.Timestamp) (string, string, error)) error {
for _, a := range addition {
if a != nil {
a(ts)
}
}
return 0, nil
return nil
}
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, nil
mockKV.save = func(key string, value string, ts typeutil.Timestamp) error {
return nil
}
err = mt.reloadFromKV()
assert.Nil(t, err)
@ -1012,7 +1029,8 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
_, err = mt.AddCollection(collInfo, idxInfo, nil)
ts := ftso()
err = mt.AddCollection(collInfo, ts, idxInfo, nil)
assert.Nil(t, err)
mt.indexID2Meta = make(map[int64]pb.IndexInfo)
_, _, err = mt.GetIndexByName(collInfo.Schema.Name, idxInfo[0].IndexName)
@ -1052,7 +1070,7 @@ func TestMetaWithTimestamp(t *testing.T) {
assert.Nil(t, err)
defer etcdCli.Close()
skv, err := newMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7, ftso)
skv, err := newMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7)
assert.Nil(t, err)
assert.NotNil(t, skv)
mt, err := NewMetaTable(skv)
@ -1068,7 +1086,8 @@ func TestMetaWithTimestamp(t *testing.T) {
collInfo.PartitionIDs = []int64{partID1}
collInfo.PartitionNames = []string{partName1}
collInfo.PartitionCreatedTimestamps = []uint64{ftso()}
t1, err := mt.AddCollection(collInfo, nil, nil)
t1 := ftso()
err = mt.AddCollection(collInfo, t1, nil, nil)
assert.Nil(t, err)
collInfo.ID = 2
@ -1077,7 +1096,8 @@ func TestMetaWithTimestamp(t *testing.T) {
collInfo.PartitionCreatedTimestamps = []uint64{ftso()}
collInfo.Schema.Name = collName2
t2, err := mt.AddCollection(collInfo, nil, nil)
t2 := ftso()
err = mt.AddCollection(collInfo, t2, nil, nil)
assert.Nil(t, err)
assert.True(t, mt.HasCollection(collID1, 0))

View File

@ -17,6 +17,7 @@ import (
"fmt"
"math/rand"
"os"
"strconv"
"sync"
"sync/atomic"
"time"
@ -85,8 +86,11 @@ type Core struct {
etcdCli *clientv3.Client
kvBase *etcdkv.EtcdKV
//DDL lock
ddlLock sync.Mutex
//setMsgStreams, send time tick into dd channel and time tick channel
SendTimeTick func(t typeutil.Timestamp) error
SendTimeTick func(t typeutil.Timestamp, reason string) error
//setMsgStreams, send create collection into dd channel
SendDdCreateCollectionReq func(ctx context.Context, req *internalpb.CreateCollectionRequest, channelNames []string) error
@ -152,6 +156,7 @@ func NewCore(c context.Context, factory ms.Factory) (*Core, error) {
core := &Core{
ctx: ctx,
cancel: cancel,
ddlLock: sync.Mutex{},
msFactory: factory,
}
core.UpdateStateCode(internalpb.StateCode_Abnormal)
@ -232,9 +237,11 @@ func (c *Core) startTimeTickLoop() {
log.Debug("rootcoord context closed", zap.Error(c.ctx.Err()))
return
case <-ticker.C:
c.ddlLock.Lock()
if ts, err := c.TSOAllocator(1); err == nil {
c.SendTimeTick(ts)
c.SendTimeTick(ts, "timetick loop")
}
c.ddlLock.Unlock()
}
}
}
@ -352,8 +359,12 @@ func (c *Core) checkFlushedSegmentsLoop() {
if info.BuildID != 0 {
info.EnableIndex = true
}
if _, err := c.MetaTable.AddIndex(&info); err != nil {
log.Debug("Add index into meta table failed", zap.Int64("collection_id", collMeta.ID), zap.Int64("index_id", info.IndexID), zap.Int64("build_id", info.BuildID), zap.Error(err))
if err := c.MetaTable.AddIndex(&info, 0); err != nil {
log.Debug("Add index into meta table failed",
zap.Int64("collection_id", collMeta.ID),
zap.Int64("index_id", info.IndexID),
zap.Int64("build_id", info.BuildID),
zap.Error(err))
}
}
}
@ -396,11 +407,8 @@ func (c *Core) setDdMsgSendFlag(b bool) error {
return nil
}
if b {
_, err = c.MetaTable.client.Save(DDMsgSendPrefix, "true")
return err
}
_, err = c.MetaTable.client.Save(DDMsgSendPrefix, "false")
ts, _ := c.TSOAllocator(1)
err = c.MetaTable.client.Save(DDMsgSendPrefix, strconv.FormatBool(b), ts)
return err
}
@ -420,7 +428,7 @@ func (c *Core) setMsgStreams() error {
timeTickStream.AsProducer([]string{Params.TimeTickChannel})
log.Debug("rootcoord AsProducer: " + Params.TimeTickChannel)
c.SendTimeTick = func(t typeutil.Timestamp) error {
c.SendTimeTick = func(t typeutil.Timestamp, reason string) error {
msgPack := ms.MsgPack{}
baseMsg := ms.BaseMsg{
BeginTimestamp: t,
@ -462,7 +470,11 @@ func (c *Core) setMsgStreams() error {
Timestamps: pt,
DefaultTimestamp: t,
}
return c.chanTimeTick.UpdateTimeTick(&ttMsg)
log.Debug("update timetick",
zap.Any("DefaultTs", t),
zap.Any("sourceID", c.session.ServerID),
zap.Any("reason", reason))
return c.chanTimeTick.UpdateTimeTick(&ttMsg, reason)
}
c.SendDdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest, channelNames []string) error {
@ -883,19 +895,8 @@ func (c *Core) Init() error {
if c.etcdCli, initError = clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints, DialTimeout: 5 * time.Second}); initError != nil {
return initError
}
tsAlloc := func() typeutil.Timestamp {
for {
var ts typeutil.Timestamp
var err error
if ts, err = c.TSOAllocator(1); err == nil {
return ts
}
time.Sleep(100 * time.Millisecond)
log.Debug("alloc time stamp error", zap.Error(err))
}
}
var ms *metaSnapshot
ms, initError = newMetaSnapshot(c.etcdCli, Params.MetaRootPath, TimestampPrefix, 1024, tsAlloc)
ms, initError = newMetaSnapshot(c.etcdCli, Params.MetaRootPath, TimestampPrefix, 1024)
if initError != nil {
return initError
}
@ -1832,7 +1833,7 @@ func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.Channel
status.Reason = fmt.Sprintf("UpdateChannelTimeTick receive invalid message %d", in.Base.GetMsgType())
return status, nil
}
err := c.chanTimeTick.UpdateTimeTick(in)
err := c.chanTimeTick.UpdateTimeTick(in, "gRPC")
if err != nil {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
@ -1913,7 +1914,8 @@ func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlus
log.Error("build index fail", zap.Int64("buildid", info.BuildID), zap.Error(err))
continue
}
_, err = c.MetaTable.AddIndex(&info)
ts, _ := c.TSOAllocator(1)
err = c.MetaTable.AddIndex(&info, ts)
if err != nil {
log.Error("AddIndex fail", zap.String("err", err.Error()))
}

View File

@ -15,9 +15,9 @@ import (
"context"
"fmt"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
@ -25,8 +25,8 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
)
type reqTask interface {
@ -180,27 +180,47 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
return EncodeDdOperation(&ddCollReq, CreateCollectionDDType)
}
ts, err := t.core.MetaTable.AddCollection(&collInfo, idxInfo, ddOp)
reason := fmt.Sprintf("create collection %d", collID)
ts, err := t.core.TSOAllocator(1)
if err != nil {
return fmt.Errorf("meta table add collection failed,error = %w", err)
return fmt.Errorf("TSO alloc fail, error = %w", err)
}
// add dml channel before send dd msg
t.core.dmlChannels.AddProducerChannels(chanNames...)
// use lambda function here to guarantee all resources to be released
createCollectionFn := func() error {
// lock for ddl operation
t.core.ddlLock.Lock()
defer t.core.ddlLock.Unlock()
err = t.core.SendDdCreateCollectionReq(ctx, &ddCollReq, chanNames)
if err != nil {
return fmt.Errorf("send dd create collection req failed, error = %w", err)
t.core.chanTimeTick.AddDdlTimeTick(ts, reason)
// clear ddl timetick in all conditions
defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
err = t.core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOp)
if err != nil {
return fmt.Errorf("meta table add collection failed,error = %w", err)
}
// add dml channel before send dd msg
t.core.dmlChannels.AddProducerChannels(chanNames...)
err = t.core.SendDdCreateCollectionReq(ctx, &ddCollReq, chanNames)
if err != nil {
return fmt.Errorf("send dd create collection req failed, error = %w", err)
}
t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
t.core.SendTimeTick(ts, reason)
return nil
}
t.core.SendTimeTick(ts)
err = createCollectionFn()
if err != nil {
return err
}
// Update DDOperation in etcd
err = t.core.setDdMsgSendFlag(true)
if err != nil {
return fmt.Errorf("send dd msg send flag failed,error = %w", err)
}
return nil
return t.core.setDdMsgSendFlag(true)
}
type DropCollectionReqTask struct {
@ -237,21 +257,45 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
return EncodeDdOperation(&ddReq, DropCollectionDDType)
}
ts, err := t.core.MetaTable.DeleteCollection(collMeta.ID, ddOp)
reason := fmt.Sprintf("drop collection %d", collMeta.ID)
ts, err := t.core.TSOAllocator(1)
if err != nil {
return fmt.Errorf("TSO alloc fail, error = %w", err)
}
// use lambda function here to guarantee all resources to be released
dropCollectionFn := func() error {
// lock for ddl operation
t.core.ddlLock.Lock()
defer t.core.ddlLock.Unlock()
t.core.chanTimeTick.AddDdlTimeTick(ts, reason)
// clear ddl timetick in all conditions
defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
err = t.core.MetaTable.DeleteCollection(collMeta.ID, ts, ddOp)
if err != nil {
return err
}
err = t.core.SendDdDropCollectionReq(ctx, &ddReq, collMeta.PhysicalChannelNames)
if err != nil {
return err
}
t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
t.core.SendTimeTick(ts, reason)
// remove dml channel after send dd msg
t.core.dmlChannels.RemoveProducerChannels(collMeta.PhysicalChannelNames...)
return nil
}
err = dropCollectionFn()
if err != nil {
return err
}
err = t.core.SendDdDropCollectionReq(ctx, &ddReq, collMeta.PhysicalChannelNames)
if err != nil {
return err
}
t.core.SendTimeTick(ts)
// remove dml channel after send dd msg
t.core.dmlChannels.RemoveProducerChannels(collMeta.PhysicalChannelNames...)
//notify query service to release collection
if err = t.core.CallReleaseCollectionService(t.core.ctx, ts, 0, collMeta.ID); err != nil {
log.Error("CallReleaseCollectionService failed", zap.String("error", err.Error()))
@ -414,18 +458,42 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
return EncodeDdOperation(&ddReq, CreatePartitionDDType)
}
ts, err := t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ddOp)
reason := fmt.Sprintf("create partition %s", t.Req.PartitionName)
ts, err := t.core.TSOAllocator(1)
if err != nil {
return fmt.Errorf("TSO alloc fail, error = %w", err)
}
// use lambda function here to guarantee all resources to be released
createPartitionFn := func() error {
// lock for ddl operation
t.core.ddlLock.Lock()
defer t.core.ddlLock.Unlock()
t.core.chanTimeTick.AddDdlTimeTick(ts, reason)
// clear ddl timetick in all conditions
defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ts, ddOp)
if err != nil {
return err
}
err = t.core.SendDdCreatePartitionReq(ctx, &ddReq, collMeta.PhysicalChannelNames)
if err != nil {
return err
}
t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
t.core.SendTimeTick(ts, reason)
return nil
}
err = createPartitionFn()
if err != nil {
return err
}
err = t.core.SendDdCreatePartitionReq(ctx, &ddReq, collMeta.PhysicalChannelNames)
if err != nil {
return err
}
t.core.SendTimeTick(ts)
req := proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO, msg type
@ -482,18 +550,42 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
return EncodeDdOperation(&ddReq, DropPartitionDDType)
}
ts, _, err := t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ddOp)
reason := fmt.Sprintf("drop partition %s", t.Req.PartitionName)
ts, err := t.core.TSOAllocator(1)
if err != nil {
return fmt.Errorf("TSO alloc fail, error = %w", err)
}
// use lambda function here to guarantee all resources to be released
dropPartitionFn := func() error {
// lock for ddl operation
t.core.ddlLock.Lock()
defer t.core.ddlLock.Unlock()
t.core.chanTimeTick.AddDdlTimeTick(ts, reason)
// clear ddl timetick in all conditions
defer t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
_, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ts, ddOp)
if err != nil {
return err
}
err = t.core.SendDdDropPartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames)
if err != nil {
return err
}
t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
t.core.SendTimeTick(ts, reason)
return nil
}
err = dropPartitionFn()
if err != nil {
return err
}
err = t.core.SendDdDropPartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames)
if err != nil {
return err
}
t.core.SendTimeTick(ts)
req := proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO, msg type
@ -724,7 +816,8 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
if info.BuildID != 0 {
info.EnableIndex = true
}
if _, err := t.core.MetaTable.AddIndex(&info); err != nil {
ts, _ := t.core.TSOAllocator(1)
if err := t.core.MetaTable.AddIndex(&info, ts); err != nil {
log.Debug("Add index into meta table failed", zap.Int64("collection_id", collMeta.ID), zap.Int64("index_id", info.IndexID), zap.Int64("build_id", info.BuildID), zap.Error(err))
}
}
@ -795,6 +888,7 @@ func (t *DropIndexReqTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
_, _, _, err = t.core.MetaTable.DropIndex(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName)
ts, _ := t.core.TSOAllocator(1)
_, _, err = t.core.MetaTable.DropIndex(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName, ts)
return err
}

View File

@ -13,6 +13,7 @@ package rootcoord
import (
"fmt"
"math"
"sync"
"github.com/milvus-io/milvus/internal/log"
@ -26,11 +27,21 @@ import (
"go.uber.org/zap"
)
type ddlTimetickInfo struct {
ddlMinTs typeutil.Timestamp
ddlTsSet map[typeutil.Timestamp]struct{}
}
type timetickSync struct {
core *Core
lock sync.Mutex
proxyTimeTick map[typeutil.UniqueID]*channelTimeTickMsg
sendChan chan map[typeutil.UniqueID]*channelTimeTickMsg
// record ddl timetick info
ddlLock sync.RWMutex
ddlMinTs typeutil.Timestamp
ddlTsSet map[typeutil.Timestamp]struct{}
}
type channelTimeTickMsg struct {
@ -63,18 +74,22 @@ func newTimeTickSync(core *Core) *timetickSync {
core: core,
proxyTimeTick: make(map[typeutil.UniqueID]*channelTimeTickMsg),
sendChan: make(chan map[typeutil.UniqueID]*channelTimeTickMsg, 16),
ddlLock: sync.RWMutex{},
ddlMinTs: typeutil.Timestamp(math.MaxUint64),
ddlTsSet: make(map[typeutil.Timestamp]struct{}),
}
}
// sendToChannel send all channels' timetick to sendChan
// lock is needed by the invoker
func (t *timetickSync) sendToChannel() {
func (t *timetickSync) sendToChannel() error {
if len(t.proxyTimeTick) == 0 {
return
return fmt.Errorf("proxyTimeTick empty")
}
for _, v := range t.proxyTimeTick {
if v == nil {
return
return fmt.Errorf("proxyTimeTick has not been fulfilled")
}
}
// clear proxyTimeTick and send a clone
@ -84,10 +99,57 @@ func (t *timetickSync) sendToChannel() {
t.proxyTimeTick[k] = nil
}
t.sendChan <- ptt
return nil
}
// AddDmlTimeTick add ts into ddlTimetickInfos[sourceID],
// can be used to tell if DDL operation is in process.
func (t *timetickSync) AddDdlTimeTick(ts typeutil.Timestamp, reason string) {
t.ddlLock.Lock()
defer t.ddlLock.Unlock()
if ts < t.ddlMinTs {
t.ddlMinTs = ts
}
t.ddlTsSet[ts] = struct{}{}
log.Debug("add ddl timetick", zap.Uint64("minTs", t.ddlMinTs), zap.Uint64("ts", ts),
zap.Int("len(ddlTsSet)", len(t.ddlTsSet)), zap.String("reason", reason))
}
// RemoveDdlTimeTick is invoked in UpdateTimeTick.
// It clears the ts generated by AddDdlTimeTick, indicates DDL operation finished.
func (t *timetickSync) RemoveDdlTimeTick(ts typeutil.Timestamp, reason string) {
t.ddlLock.Lock()
defer t.ddlLock.Unlock()
delete(t.ddlTsSet, ts)
log.Debug("remove ddl timetick", zap.Uint64("ts", ts), zap.Int("len(ddlTsSet)", len(t.ddlTsSet)),
zap.String("reason", reason))
if len(t.ddlTsSet) == 0 {
t.ddlMinTs = typeutil.Timestamp(math.MaxUint64)
} else if t.ddlMinTs == ts {
// re-calculate minTs
minTs := typeutil.Timestamp(math.MaxUint64)
for tt := range t.ddlTsSet {
if tt < minTs {
minTs = tt
}
}
t.ddlMinTs = ts
log.Debug("update ddl minTs", zap.Any("minTs", ts))
}
}
func (t *timetickSync) GetDdlMinTimeTick() typeutil.Timestamp {
t.ddlLock.Lock()
defer t.ddlLock.Unlock()
return t.ddlMinTs
}
// UpdateTimeTick check msg validation and send it to local channel
func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg) error {
func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg, reason string) error {
t.lock.Lock()
defer t.lock.Unlock()
if len(in.ChannelNames) == 0 && in.DefaultTimestamp == 0 {
@ -101,20 +163,40 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg) error {
if !ok {
return fmt.Errorf("Skip ChannelTimeTickMsg from un-recognized proxy node %d", in.Base.SourceID)
}
// if ddl operation not finished, skip current ts update
ddlMinTs := t.GetDdlMinTimeTick()
if in.DefaultTimestamp > ddlMinTs {
log.Debug("ddl not finished", zap.Int64("source id", in.Base.SourceID),
zap.Uint64("curr ts", in.DefaultTimestamp),
zap.Uint64("ddlMinTs", ddlMinTs),
zap.String("reason", reason))
return nil
}
if in.Base.SourceID == t.core.session.ServerID {
if prev != nil && prev.in.DefaultTimestamp >= in.DefaultTimestamp {
log.Debug("timestamp go back", zap.Int64("source id", in.Base.SourceID), zap.Uint64("prev ts", prev.in.DefaultTimestamp), zap.Uint64("curr ts", in.DefaultTimestamp))
if prev != nil && in.DefaultTimestamp <= prev.in.DefaultTimestamp {
log.Debug("timestamp go back", zap.Int64("source id", in.Base.SourceID),
zap.Uint64("curr ts", in.DefaultTimestamp),
zap.Uint64("prev ts", prev.in.DefaultTimestamp),
zap.String("reason", reason))
return nil
}
}
if in.DefaultTimestamp == 0 {
mints := minTimeTick(in.Timestamps...)
log.Debug("default time stamp is zero, set it to the min value of inputs", zap.Int64("proxy id", in.Base.SourceID), zap.Uint64("min ts", mints))
log.Debug("default time stamp is zero, set it to the min value of inputs",
zap.Int64("proxy id", in.Base.SourceID), zap.Uint64("min ts", mints))
in.DefaultTimestamp = mints
}
t.proxyTimeTick[in.Base.SourceID] = newChannelTimeTickMsg(in)
t.sendToChannel()
log.Debug("update proxyTimeTick", zap.Int64("source id", in.Base.SourceID),
zap.Uint64("inTs", in.DefaultTimestamp), zap.String("reason", reason))
if err := t.sendToChannel(); err != nil {
log.Debug("sendToChannel fail", zap.Any("err", err.Error()))
}
return nil
}
@ -168,7 +250,6 @@ func (t *timetickSync) StartWatch() {
log.Debug("SendChannelTimeTick fail", zap.Error(err))
}
}
}
}
}