mirror of https://github.com/milvus-io/milvus.git
Reduce info saved in SnapshotMeta (#10288)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/10289/head
parent
0120068d92
commit
ed2b8d67c0
|
@ -51,6 +51,13 @@ func NewEtcdKV(etcdEndpoints []string, rootPath string) (*EtcdKV, error) {
|
|||
return kv, nil
|
||||
}
|
||||
|
||||
func NewEtcdKVWithClient(cli *clientv3.Client, rootPath string) *EtcdKV {
|
||||
return &EtcdKV{
|
||||
client: cli,
|
||||
rootPath: rootPath,
|
||||
}
|
||||
}
|
||||
|
||||
func (kv *EtcdKV) Close() {
|
||||
kv.client.Close()
|
||||
}
|
||||
|
|
|
@ -74,7 +74,8 @@ const (
|
|||
|
||||
// MetaTable store all rootcoord meta info
|
||||
type MetaTable struct {
|
||||
client kv.SnapShotKV // client of a reliable kv service, i.e. etcd client
|
||||
txn kv.TxnKV // client of a reliable txnkv service, i.e. etcd client
|
||||
snapshot kv.SnapShotKV // client of a reliable snapshotkv service, i.e. etcd client
|
||||
tenantID2Meta map[typeutil.UniqueID]pb.TenantMeta // tenant id to tenant meta
|
||||
proxyID2Meta map[typeutil.UniqueID]pb.ProxyMeta // proxy id to proxy meta
|
||||
collID2Meta map[typeutil.UniqueID]pb.CollectionInfo // collection_id -> meta
|
||||
|
@ -91,9 +92,10 @@ type MetaTable struct {
|
|||
|
||||
// NewMetaTable create meta table for rootcoord, which stores all in-memory information
|
||||
// for collection, partition, segment, index etc.
|
||||
func NewMetaTable(kv kv.SnapShotKV) (*MetaTable, error) {
|
||||
func NewMetaTable(txn kv.TxnKV, snap kv.SnapShotKV) (*MetaTable, error) {
|
||||
mt := &MetaTable{
|
||||
client: kv,
|
||||
txn: txn,
|
||||
snapshot: snap,
|
||||
tenantLock: sync.RWMutex{},
|
||||
proxyLock: sync.RWMutex{},
|
||||
ddLock: sync.RWMutex{},
|
||||
|
@ -115,7 +117,7 @@ func (mt *MetaTable) reloadFromKV() error {
|
|||
mt.segID2IndexMeta = make(map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo)
|
||||
mt.indexID2Meta = make(map[typeutil.UniqueID]pb.IndexInfo)
|
||||
|
||||
_, values, err := mt.client.LoadWithPrefix(TenantMetaPrefix, 0)
|
||||
_, values, err := mt.snapshot.LoadWithPrefix(TenantMetaPrefix, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -129,7 +131,7 @@ func (mt *MetaTable) reloadFromKV() error {
|
|||
mt.tenantID2Meta[tenantMeta.ID] = tenantMeta
|
||||
}
|
||||
|
||||
_, values, err = mt.client.LoadWithPrefix(ProxyMetaPrefix, 0)
|
||||
_, values, err = mt.txn.LoadWithPrefix(ProxyMetaPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -143,7 +145,7 @@ func (mt *MetaTable) reloadFromKV() error {
|
|||
mt.proxyID2Meta[proxyMeta.ID] = proxyMeta
|
||||
}
|
||||
|
||||
_, values, err = mt.client.LoadWithPrefix(CollectionMetaPrefix, 0)
|
||||
_, values, err = mt.snapshot.LoadWithPrefix(CollectionMetaPrefix, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -158,7 +160,7 @@ func (mt *MetaTable) reloadFromKV() error {
|
|||
mt.collName2ID[collInfo.Schema.Name] = collInfo.ID
|
||||
}
|
||||
|
||||
_, values, err = mt.client.LoadWithPrefix(SegmentIndexMetaPrefix, 0)
|
||||
_, values, err = mt.txn.LoadWithPrefix(SegmentIndexMetaPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -190,7 +192,7 @@ func (mt *MetaTable) reloadFromKV() error {
|
|||
}
|
||||
}
|
||||
|
||||
_, values, err = mt.client.LoadWithPrefix(IndexMetaPrefix, 0)
|
||||
_, values, err = mt.txn.LoadWithPrefix(IndexMetaPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -203,7 +205,7 @@ func (mt *MetaTable) reloadFromKV() error {
|
|||
mt.indexID2Meta[meta.IndexID] = meta
|
||||
}
|
||||
|
||||
_, values, err = mt.client.LoadWithPrefix(CollectionAliasMetaPrefix, 0)
|
||||
_, values, err = mt.snapshot.LoadWithPrefix(CollectionAliasMetaPrefix, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -232,7 +234,7 @@ func (mt *MetaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error {
|
|||
return err
|
||||
}
|
||||
|
||||
err = mt.client.Save(k, string(v), ts)
|
||||
err = mt.snapshot.Save(k, string(v), ts)
|
||||
if err != nil {
|
||||
log.Error("AddTenant Save fail", zap.Error(err))
|
||||
return err
|
||||
|
@ -242,7 +244,7 @@ func (mt *MetaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error {
|
|||
}
|
||||
|
||||
// AddProxy add proxy
|
||||
func (mt *MetaTable) AddProxy(po *pb.ProxyMeta, ts typeutil.Timestamp) error {
|
||||
func (mt *MetaTable) AddProxy(po *pb.ProxyMeta) error {
|
||||
mt.proxyLock.Lock()
|
||||
defer mt.proxyLock.Unlock()
|
||||
|
||||
|
@ -253,7 +255,7 @@ func (mt *MetaTable) AddProxy(po *pb.ProxyMeta, ts typeutil.Timestamp) error {
|
|||
return err
|
||||
}
|
||||
|
||||
err = mt.client.Save(k, string(v), ts)
|
||||
err = mt.txn.Save(k, string(v))
|
||||
if err != nil {
|
||||
log.Error("SnapShotKV Save fail", zap.Error(err))
|
||||
panic("SnapShotKV Save fail")
|
||||
|
@ -313,7 +315,7 @@ func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestam
|
|||
meta[DDMsgSendPrefix] = "false"
|
||||
meta[DDOperationPrefix] = ddOpStr
|
||||
|
||||
err = mt.client.MultiSave(meta, ts)
|
||||
err = mt.snapshot.MultiSave(meta, ts)
|
||||
if err != nil {
|
||||
log.Error("SnapShotKV MultiSave fail", zap.Error(err))
|
||||
panic("SnapShotKV MultiSave fail")
|
||||
|
@ -365,15 +367,17 @@ func (mt *MetaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Time
|
|||
}
|
||||
}
|
||||
|
||||
delMetakeys := []string{
|
||||
delMetakeysSnap := []string{
|
||||
fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID),
|
||||
}
|
||||
delMetaKeysTxn := []string{
|
||||
fmt.Sprintf("%s/%d", SegmentIndexMetaPrefix, collID),
|
||||
fmt.Sprintf("%s/%d", IndexMetaPrefix, collID),
|
||||
}
|
||||
|
||||
for _, alias := range aliases {
|
||||
delete(mt.collAlias2ID, alias)
|
||||
delMetakeys = append(delMetakeys,
|
||||
delMetakeysSnap = append(delMetakeysSnap,
|
||||
fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias),
|
||||
)
|
||||
}
|
||||
|
@ -384,11 +388,16 @@ func (mt *MetaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Time
|
|||
DDOperationPrefix: ddOpStr,
|
||||
}
|
||||
|
||||
err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys, ts)
|
||||
err := mt.snapshot.MultiSaveAndRemoveWithPrefix(map[string]string{}, delMetakeysSnap, ts)
|
||||
if err != nil {
|
||||
log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
|
||||
panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
|
||||
}
|
||||
err = mt.txn.MultiSaveAndRemoveWithPrefix(saveMeta, delMetaKeysTxn)
|
||||
if err != nil {
|
||||
log.Warn("TxnKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
|
||||
//Txn kv fail will no panic here, treated as garbage
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -402,7 +411,7 @@ func (mt *MetaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timesta
|
|||
return ok
|
||||
}
|
||||
key := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
|
||||
_, err := mt.client.Load(key, ts)
|
||||
_, err := mt.snapshot.Load(key, ts)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
|
@ -420,7 +429,7 @@ func (mt *MetaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeut
|
|||
return colCopy.(*pb.CollectionInfo), nil
|
||||
}
|
||||
key := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collectionID)
|
||||
val, err := mt.client.Load(key, ts)
|
||||
val, err := mt.snapshot.Load(key, ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -451,7 +460,7 @@ func (mt *MetaTable) GetCollectionByName(collectionName string, ts typeutil.Time
|
|||
colCopy := proto.Clone(&col)
|
||||
return colCopy.(*pb.CollectionInfo), nil
|
||||
}
|
||||
_, vals, err := mt.client.LoadWithPrefix(CollectionMetaPrefix, ts)
|
||||
_, vals, err := mt.snapshot.LoadWithPrefix(CollectionMetaPrefix, ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -483,7 +492,7 @@ func (mt *MetaTable) ListCollections(ts typeutil.Timestamp) (map[string]*pb.Coll
|
|||
}
|
||||
return colls, nil
|
||||
}
|
||||
_, vals, err := mt.client.LoadWithPrefix(CollectionMetaPrefix, ts)
|
||||
_, vals, err := mt.snapshot.LoadWithPrefix(CollectionMetaPrefix, ts)
|
||||
if err != nil {
|
||||
log.Debug("load with prefix error", zap.Uint64("timestamp", ts), zap.Error(err))
|
||||
return nil, nil
|
||||
|
@ -585,16 +594,21 @@ func (mt *MetaTable) AddPartition(collID typeutil.UniqueID, partitionName string
|
|||
return fmt.Errorf("MetaTable AddPartition Marshal fail, k1:%s, err:%w", k1, err)
|
||||
}
|
||||
meta := map[string]string{k1: string(v1)}
|
||||
|
||||
metaTxn := map[string]string{}
|
||||
// save ddOpStr into etcd
|
||||
meta[DDMsgSendPrefix] = "false"
|
||||
meta[DDOperationPrefix] = ddOpStr
|
||||
metaTxn[DDMsgSendPrefix] = "false"
|
||||
metaTxn[DDOperationPrefix] = ddOpStr
|
||||
|
||||
err = mt.client.MultiSave(meta, ts)
|
||||
err = mt.snapshot.MultiSave(meta, ts)
|
||||
if err != nil {
|
||||
log.Error("SnapShotKV MultiSave fail", zap.Error(err))
|
||||
panic("SnapShotKV MultiSave fail")
|
||||
}
|
||||
err = mt.txn.MultiSave(metaTxn)
|
||||
if err != nil {
|
||||
// will not panic, missing create msg
|
||||
log.Warn("TxnKV MultiSave fail", zap.Error(err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -615,7 +629,7 @@ func (mt *MetaTable) GetPartitionNameByID(collID, partitionID typeutil.UniqueID,
|
|||
return "", fmt.Errorf("partition %d does not exist", partitionID)
|
||||
}
|
||||
collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
|
||||
collVal, err := mt.client.Load(collKey, ts)
|
||||
collVal, err := mt.snapshot.Load(collKey, ts)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -646,7 +660,7 @@ func (mt *MetaTable) getPartitionByName(collID typeutil.UniqueID, partitionName
|
|||
return 0, fmt.Errorf("partition %s does not exist", partitionName)
|
||||
}
|
||||
collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
|
||||
collVal, err := mt.client.Load(collKey, ts)
|
||||
collVal, err := mt.snapshot.Load(collKey, ts)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -732,27 +746,33 @@ func (mt *MetaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
|
|||
zap.String("key", k), zap.Error(err))
|
||||
return 0, fmt.Errorf("MetaTable DeletePartition Marshal collectionMeta fail key:%s, err:%w", k, err)
|
||||
}
|
||||
meta := map[string]string{k: string(v)}
|
||||
var delMetaKeys []string
|
||||
for _, idxInfo := range collMeta.FieldIndexes {
|
||||
k := fmt.Sprintf("%s/%d/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, idxInfo.IndexID, partID)
|
||||
delMetaKeys = append(delMetaKeys, k)
|
||||
}
|
||||
|
||||
metaTxn := make(map[string]string)
|
||||
// save ddOpStr into etcd
|
||||
meta[DDMsgSendPrefix] = "false"
|
||||
meta[DDOperationPrefix] = ddOpStr
|
||||
metaTxn[DDMsgSendPrefix] = "false"
|
||||
metaTxn[DDOperationPrefix] = ddOpStr
|
||||
|
||||
err = mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys, ts)
|
||||
err = mt.snapshot.Save(k, string(v), ts)
|
||||
if err != nil {
|
||||
log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
|
||||
panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
|
||||
}
|
||||
err = mt.txn.MultiSaveAndRemoveWithPrefix(metaTxn, delMetaKeys)
|
||||
if err != nil {
|
||||
log.Warn("TxnKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
|
||||
// will not panic, failed txn shall be treated by garbage related logic
|
||||
}
|
||||
|
||||
return partID, nil
|
||||
}
|
||||
|
||||
// AddIndex add index
|
||||
func (mt *MetaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, ts typeutil.Timestamp) error {
|
||||
func (mt *MetaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) error {
|
||||
mt.ddLock.Lock()
|
||||
defer mt.ddLock.Unlock()
|
||||
|
||||
|
@ -802,7 +822,7 @@ func (mt *MetaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, ts typeutil.Times
|
|||
return fmt.Errorf("MetaTable AddIndex Marshal segIdxInfo fail key:%s, err:%w", k, err)
|
||||
}
|
||||
|
||||
err = mt.client.Save(k, string(v), ts)
|
||||
err = mt.txn.Save(k, string(v))
|
||||
if err != nil {
|
||||
log.Error("SnapShotKV Save fail", zap.Error(err))
|
||||
panic("SnapShotKV Save fail")
|
||||
|
@ -812,7 +832,7 @@ func (mt *MetaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, ts typeutil.Times
|
|||
}
|
||||
|
||||
// DropIndex drop index
|
||||
func (mt *MetaTable) DropIndex(collName, fieldName, indexName string, ts typeutil.Timestamp) (typeutil.UniqueID, bool, error) {
|
||||
func (mt *MetaTable) DropIndex(collName, fieldName, indexName string) (typeutil.UniqueID, bool, error) {
|
||||
mt.ddLock.Lock()
|
||||
defer mt.ddLock.Unlock()
|
||||
|
||||
|
@ -885,17 +905,17 @@ func (mt *MetaTable) DropIndex(collName, fieldName, indexName string, ts typeuti
|
|||
fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collMeta.ID, dropIdxID),
|
||||
}
|
||||
|
||||
err = mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta, ts)
|
||||
err = mt.txn.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta)
|
||||
if err != nil {
|
||||
log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
|
||||
panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
|
||||
log.Error("TxnKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
|
||||
panic("TxnKV MultiSaveAndRemoveWithPrefix fail")
|
||||
}
|
||||
|
||||
return dropIdxID, true, nil
|
||||
}
|
||||
|
||||
// GetSegmentIndexInfoByID return segment index info by segment id
|
||||
func (mt *MetaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error) {
|
||||
func (mt *MetaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, fieldID int64, idxName string) (pb.SegmentIndexInfo, error) {
|
||||
mt.ddLock.RLock()
|
||||
defer mt.ddLock.RUnlock()
|
||||
|
||||
|
@ -903,7 +923,7 @@ func (mt *MetaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID in
|
|||
if !ok {
|
||||
return pb.SegmentIndexInfo{
|
||||
SegmentID: segID,
|
||||
FieldID: filedID,
|
||||
FieldID: fieldID,
|
||||
IndexID: 0,
|
||||
BuildID: 0,
|
||||
EnableIndex: false,
|
||||
|
@ -913,7 +933,7 @@ func (mt *MetaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID in
|
|||
return pb.SegmentIndexInfo{}, fmt.Errorf("segment id %d not has any index", segID)
|
||||
}
|
||||
|
||||
if filedID == -1 && idxName == "" { // return default index
|
||||
if fieldID == -1 && idxName == "" { // return default index
|
||||
for _, seg := range segIdxMap {
|
||||
info, ok := mt.indexID2Meta[seg.IndexID]
|
||||
if ok && info.IndexName == Params.DefaultIndexName {
|
||||
|
@ -927,14 +947,14 @@ func (mt *MetaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID in
|
|||
if idxMeta.IndexName != idxName {
|
||||
continue
|
||||
}
|
||||
if seg.FieldID != filedID {
|
||||
if seg.FieldID != fieldID {
|
||||
continue
|
||||
}
|
||||
return seg, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return pb.SegmentIndexInfo{}, fmt.Errorf("can't find index name = %s on segment = %d, with filed id = %d", idxName, segID, filedID)
|
||||
return pb.SegmentIndexInfo{}, fmt.Errorf("can't find index name = %s on segment = %d, with filed id = %d", idxName, segID, fieldID)
|
||||
}
|
||||
|
||||
// GetFieldSchema return field schema
|
||||
|
@ -996,7 +1016,7 @@ func (mt *MetaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema
|
|||
}
|
||||
|
||||
// GetNotIndexedSegments return segment ids which have no index
|
||||
func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo, segIDs []typeutil.UniqueID, ts typeutil.Timestamp) ([]typeutil.UniqueID, schemapb.FieldSchema, error) {
|
||||
func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo, segIDs []typeutil.UniqueID) ([]typeutil.UniqueID, schemapb.FieldSchema, error) {
|
||||
mt.ddLock.Lock()
|
||||
defer mt.ddLock.Unlock()
|
||||
|
||||
|
@ -1081,10 +1101,10 @@ func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, id
|
|||
}
|
||||
meta[k] = string(v)
|
||||
}
|
||||
err = mt.client.MultiSave(meta, ts)
|
||||
err = mt.txn.MultiSave(meta)
|
||||
if err != nil {
|
||||
log.Error("SnapShotKV MultiSave fail", zap.Error(err))
|
||||
panic("SnapShotKV MultiSave fail")
|
||||
log.Error("TxnKV MultiSave fail", zap.Error(err))
|
||||
panic("TxnKV MultiSave fail")
|
||||
}
|
||||
} else {
|
||||
idxInfo.IndexID = existInfo.IndexID
|
||||
|
@ -1113,7 +1133,7 @@ func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, id
|
|||
meta[k] = string(v)
|
||||
}
|
||||
|
||||
err = mt.client.MultiSave(meta, ts)
|
||||
err = mt.txn.MultiSave(meta)
|
||||
if err != nil {
|
||||
log.Error("SnapShotKV MultiSave fail", zap.Error(err))
|
||||
panic("SnapShotKV MultiSave fail")
|
||||
|
@ -1224,7 +1244,7 @@ func (mt *MetaTable) AddAlias(collectionAlias string, collectionName string, ts
|
|||
return fmt.Errorf("MetaTable AddAlias Marshal CollectionInfo fail key:%s, err:%w", k, err)
|
||||
}
|
||||
|
||||
err = mt.client.Save(k, string(v), ts)
|
||||
err = mt.snapshot.Save(k, string(v), ts)
|
||||
if err != nil {
|
||||
log.Error("SnapShotKV Save fail", zap.Error(err))
|
||||
panic("SnapShotKV Save fail")
|
||||
|
@ -1245,7 +1265,7 @@ func (mt *MetaTable) DropAlias(collectionAlias string, ts typeutil.Timestamp) er
|
|||
fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias),
|
||||
}
|
||||
meta := make(map[string]string)
|
||||
err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetakeys, ts)
|
||||
err := mt.snapshot.MultiSaveAndRemoveWithPrefix(meta, delMetakeys, ts)
|
||||
if err != nil {
|
||||
log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
|
||||
panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
|
||||
|
@ -1275,7 +1295,7 @@ func (mt *MetaTable) AlterAlias(collectionAlias string, collectionName string, t
|
|||
return fmt.Errorf("MetaTable AlterAlias Marshal CollectionInfo fail key:%s, err:%w", k, err)
|
||||
}
|
||||
|
||||
err = mt.client.Save(k, string(v), ts)
|
||||
err = mt.snapshot.Save(k, string(v), ts)
|
||||
if err != nil {
|
||||
log.Error("SnapShotKV Save fail", zap.Error(err))
|
||||
panic("SnapShotKV Save fail")
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
package rootcoord
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
@ -18,6 +19,7 @@ import (
|
|||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
|
@ -54,8 +56,33 @@ func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, remov
|
|||
return m.multiSaveAndRemoveWithPrefix(saves, removals, ts)
|
||||
}
|
||||
|
||||
type mockTestTxnKV struct {
|
||||
kv.TxnKV
|
||||
loadWithPrefix func(key string) ([]string, []string, error)
|
||||
save func(key, value string) error
|
||||
multiSave func(kvs map[string]string) error
|
||||
multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string) error
|
||||
}
|
||||
|
||||
func (m *mockTestTxnKV) LoadWithPrefix(key string) ([]string, []string, error) {
|
||||
return m.loadWithPrefix(key)
|
||||
}
|
||||
|
||||
func (m *mockTestTxnKV) Save(key, value string) error {
|
||||
return m.save(key, value)
|
||||
}
|
||||
|
||||
func (m *mockTestTxnKV) MultiSave(kvs map[string]string) error {
|
||||
return m.multiSave(kvs)
|
||||
}
|
||||
|
||||
func (m *mockTestTxnKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
|
||||
return m.multiSaveAndRemoveWithPrefix(saves, removals)
|
||||
}
|
||||
|
||||
func Test_MockKV(t *testing.T) {
|
||||
k1 := &mockTestKV{}
|
||||
kt := &mockTestTxnKV{}
|
||||
prefix := make(map[string][]string)
|
||||
k1.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
|
||||
if val, ok := prefix[key]; ok {
|
||||
|
@ -63,69 +90,75 @@ func Test_MockKV(t *testing.T) {
|
|||
}
|
||||
return nil, nil, fmt.Errorf("load prefix error")
|
||||
}
|
||||
kt.loadWithPrefix = func(key string) ([]string, []string, error) {
|
||||
if val, ok := prefix[key]; ok {
|
||||
return nil, val, nil
|
||||
}
|
||||
return nil, nil, fmt.Errorf("load prefix error")
|
||||
}
|
||||
|
||||
_, err := NewMetaTable(k1)
|
||||
_, err := NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, "load prefix error")
|
||||
|
||||
// tenant
|
||||
prefix[TenantMetaPrefix] = []string{"tenant-prefix"}
|
||||
_, err = NewMetaTable(k1)
|
||||
_, err = NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
value, err := proto.Marshal(&pb.TenantMeta{})
|
||||
assert.Nil(t, err)
|
||||
prefix[TenantMetaPrefix] = []string{string(value)}
|
||||
_, err = NewMetaTable(k1)
|
||||
_, err = NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
// proxy
|
||||
prefix[ProxyMetaPrefix] = []string{"porxy-meta"}
|
||||
_, err = NewMetaTable(k1)
|
||||
_, err = NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
value, err = proto.Marshal(&pb.ProxyMeta{})
|
||||
assert.Nil(t, err)
|
||||
prefix[ProxyMetaPrefix] = []string{string(value)}
|
||||
_, err = NewMetaTable(k1)
|
||||
_, err = NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
// collection
|
||||
prefix[CollectionMetaPrefix] = []string{"collection-meta"}
|
||||
_, err = NewMetaTable(k1)
|
||||
_, err = NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
value, err = proto.Marshal(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{}})
|
||||
assert.Nil(t, err)
|
||||
prefix[CollectionMetaPrefix] = []string{string(value)}
|
||||
_, err = NewMetaTable(k1)
|
||||
_, err = NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
// segment index
|
||||
prefix[SegmentIndexMetaPrefix] = []string{"segment-index-meta"}
|
||||
_, err = NewMetaTable(k1)
|
||||
_, err = NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
value, err = proto.Marshal(&pb.SegmentIndexInfo{})
|
||||
assert.Nil(t, err)
|
||||
prefix[SegmentIndexMetaPrefix] = []string{string(value)}
|
||||
_, err = NewMetaTable(k1)
|
||||
_, err = NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
prefix[SegmentIndexMetaPrefix] = []string{string(value), string(value)}
|
||||
_, err = NewMetaTable(k1)
|
||||
_, err = NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, "load prefix error")
|
||||
|
||||
// index
|
||||
prefix[IndexMetaPrefix] = []string{"index-meta"}
|
||||
_, err = NewMetaTable(k1)
|
||||
_, err = NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
value, err = proto.Marshal(&pb.IndexInfo{})
|
||||
assert.Nil(t, err)
|
||||
prefix[IndexMetaPrefix] = []string{string(value)}
|
||||
m1, err := NewMetaTable(k1)
|
||||
m1, err := NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, "load prefix error")
|
||||
prefix[CollectionAliasMetaPrefix] = []string{"alias-meta"}
|
||||
|
@ -141,7 +174,7 @@ func Test_MockKV(t *testing.T) {
|
|||
k1.save = func(key string, value string, ts typeutil.Timestamp) error {
|
||||
return fmt.Errorf("save proxy error")
|
||||
}
|
||||
assert.Panics(t, func() { m1.AddProxy(&pb.ProxyMeta{}, 0) })
|
||||
assert.Panics(t, func() { m1.AddProxy(&pb.ProxyMeta{}) })
|
||||
//err = m1.AddProxy(&pb.ProxyMeta{}, 0)
|
||||
//assert.NotNil(t, err)
|
||||
//assert.EqualError(t, err, "save proxy error")
|
||||
|
@ -185,7 +218,8 @@ func TestMetaTable(t *testing.T) {
|
|||
skv, err := newMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, skv)
|
||||
mt, err := NewMetaTable(skv)
|
||||
txnKV := etcdkv.NewEtcdKVWithClient(etcdCli, rootPath)
|
||||
mt, err := NewMetaTable(txnKV, skv)
|
||||
assert.Nil(t, err)
|
||||
|
||||
collInfo := &pb.CollectionInfo{
|
||||
|
@ -274,7 +308,7 @@ func TestMetaTable(t *testing.T) {
|
|||
assert.Equal(t, collInfo.Schema.Fields[0].FieldID, field.FieldID)
|
||||
|
||||
// check DD operation flag
|
||||
flag, err := mt.client.Load(DDMsgSendPrefix, 0)
|
||||
flag, err := mt.snapshot.Load(DDMsgSendPrefix, 0)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "false", flag)
|
||||
})
|
||||
|
@ -318,7 +352,7 @@ func TestMetaTable(t *testing.T) {
|
|||
assert.Equal(t, ts, collMeta.PartitionCreatedTimestamps[1])
|
||||
|
||||
// check DD operation flag
|
||||
flag, err := mt.client.Load(DDMsgSendPrefix, 0)
|
||||
flag, err := mt.txn.Load(DDMsgSendPrefix)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "false", flag)
|
||||
})
|
||||
|
@ -332,15 +366,15 @@ func TestMetaTable(t *testing.T) {
|
|||
IndexID: indexID,
|
||||
BuildID: buildID,
|
||||
}
|
||||
err = mt.AddIndex(&segIdxInfo, 0)
|
||||
err = mt.AddIndex(&segIdxInfo)
|
||||
assert.Nil(t, err)
|
||||
|
||||
// it's legal to add index twice
|
||||
err = mt.AddIndex(&segIdxInfo, 0)
|
||||
err = mt.AddIndex(&segIdxInfo)
|
||||
assert.Nil(t, err)
|
||||
|
||||
segIdxInfo.BuildID = 202
|
||||
err = mt.AddIndex(&segIdxInfo, 0)
|
||||
err = mt.AddIndex(&segIdxInfo)
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, fmt.Sprintf("index id = %d exist", segIdxInfo.IndexID))
|
||||
})
|
||||
|
@ -373,9 +407,9 @@ func TestMetaTable(t *testing.T) {
|
|||
IndexParams: params,
|
||||
}
|
||||
|
||||
_, _, err := mt.GetNotIndexedSegments("collTest", "field110", idxInfo, nil, 0)
|
||||
_, _, err := mt.GetNotIndexedSegments("collTest", "field110", idxInfo, nil)
|
||||
assert.NotNil(t, err)
|
||||
seg, field, err := mt.GetNotIndexedSegments(collName, "field110", idxInfo, []typeutil.UniqueID{segID, segID2}, 0)
|
||||
seg, field, err := mt.GetNotIndexedSegments(collName, "field110", idxInfo, []typeutil.UniqueID{segID, segID2})
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, 1, len(seg))
|
||||
assert.Equal(t, segID2, seg[0])
|
||||
|
@ -391,7 +425,7 @@ func TestMetaTable(t *testing.T) {
|
|||
idxInfo.IndexID = 2001
|
||||
idxInfo.IndexName = "field110-1"
|
||||
|
||||
seg, field, err = mt.GetNotIndexedSegments(collName, "field110", idxInfo, []typeutil.UniqueID{segID, segID2}, 0)
|
||||
seg, field, err = mt.GetNotIndexedSegments(collName, "field110", idxInfo, []typeutil.UniqueID{segID, segID2})
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, 2, len(seg))
|
||||
assert.Equal(t, segID, seg[0])
|
||||
|
@ -431,20 +465,20 @@ func TestMetaTable(t *testing.T) {
|
|||
po := pb.ProxyMeta{
|
||||
ID: 101,
|
||||
}
|
||||
err = mt.AddProxy(&po, 0)
|
||||
err = mt.AddProxy(&po)
|
||||
assert.Nil(t, err)
|
||||
|
||||
_, err = NewMetaTable(skv)
|
||||
_, err = NewMetaTable(txnKV, skv)
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("drop index", func(t *testing.T) {
|
||||
idx, ok, err := mt.DropIndex(collName, "field110", "field110", 0)
|
||||
idx, ok, err := mt.DropIndex(collName, "field110", "field110")
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, indexID, idx)
|
||||
|
||||
_, ok, err = mt.DropIndex(collName, "field110", "field110-error", 0)
|
||||
_, ok, err = mt.DropIndex(collName, "field110", "field110-error")
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, ok)
|
||||
|
||||
|
@ -468,7 +502,7 @@ func TestMetaTable(t *testing.T) {
|
|||
assert.Equal(t, partID, id)
|
||||
|
||||
// check DD operation flag
|
||||
flag, err := mt.client.Load(DDMsgSendPrefix, 0)
|
||||
flag, err := mt.txn.Load(DDMsgSendPrefix)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "false", flag)
|
||||
})
|
||||
|
@ -487,14 +521,25 @@ func TestMetaTable(t *testing.T) {
|
|||
assert.NotNil(t, err)
|
||||
|
||||
// check DD operation flag
|
||||
flag, err := mt.client.Load(DDMsgSendPrefix, 0)
|
||||
flag, err := mt.txn.Load(DDMsgSendPrefix)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "false", flag)
|
||||
})
|
||||
|
||||
/////////////////////////// these tests should run at last, it only used to hit the error lines ////////////////////////
|
||||
txnkv := etcdkv.NewEtcdKVWithClient(etcdCli, rootPath)
|
||||
mockKV := &mockTestKV{}
|
||||
mt.client = mockKV
|
||||
mt.snapshot = mockKV
|
||||
mockTxnKV := &mockTestTxnKV{
|
||||
TxnKV: mt.txn,
|
||||
loadWithPrefix: func(key string) ([]string, []string, error) { return txnkv.LoadWithPrefix(key) },
|
||||
save: func(key, value string) error { return txnkv.Save(key, value) },
|
||||
multiSave: func(kvs map[string]string) error { return txnkv.MultiSave(kvs) },
|
||||
multiSaveAndRemoveWithPrefix: func(kvs map[string]string, removal []string) error {
|
||||
return txnkv.MultiSaveAndRemoveWithPrefix(kvs, removal)
|
||||
},
|
||||
}
|
||||
mt.txn = mockTxnKV
|
||||
|
||||
t.Run("add collection failed", func(t *testing.T) {
|
||||
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
|
||||
|
@ -507,9 +552,6 @@ func TestMetaTable(t *testing.T) {
|
|||
collInfo.PartitionNames = nil
|
||||
collInfo.PartitionCreatedTimestamps = nil
|
||||
assert.Panics(t, func() { mt.AddCollection(collInfo, 0, idxInfo, "") })
|
||||
//err = mt.AddCollection(collInfo, 0, idxInfo, nil)
|
||||
//assert.NotNil(t, err)
|
||||
//assert.EqualError(t, err, "multi save error")
|
||||
})
|
||||
|
||||
t.Run("delete collection failed", func(t *testing.T) {
|
||||
|
@ -521,9 +563,6 @@ func TestMetaTable(t *testing.T) {
|
|||
}
|
||||
ts := ftso()
|
||||
assert.Panics(t, func() { mt.DeleteCollection(collInfo.ID, ts, "") })
|
||||
//err = mt.DeleteCollection(collInfo.ID, ts, nil)
|
||||
//assert.NotNil(t, err)
|
||||
//assert.EqualError(t, err, "multi save and remove with prefix error")
|
||||
})
|
||||
|
||||
t.Run("get collection failed", func(t *testing.T) {
|
||||
|
@ -655,9 +694,7 @@ func TestMetaTable(t *testing.T) {
|
|||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, "partition abc does not exist")
|
||||
|
||||
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
return fmt.Errorf("multi save and remove with prefix error")
|
||||
}
|
||||
mockKV.save = func(key, value string, ts typeutil.Timestamp) error { return errors.New("mocked error") }
|
||||
assert.Panics(t, func() { mt.DeletePartition(collInfo.ID, partName, ts, "") })
|
||||
//_, err = mt.DeletePartition(collInfo.ID, partName, ts, nil)
|
||||
//assert.NotNil(t, err)
|
||||
|
@ -697,13 +734,12 @@ func TestMetaTable(t *testing.T) {
|
|||
IndexID: indexID2,
|
||||
BuildID: buildID,
|
||||
}
|
||||
ts = ftso()
|
||||
err = mt.AddIndex(&segIdxInfo, ts)
|
||||
err = mt.AddIndex(&segIdxInfo)
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", segIdxInfo.IndexID))
|
||||
|
||||
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
|
||||
err = mt.AddIndex(&segIdxInfo, ts)
|
||||
err = mt.AddIndex(&segIdxInfo)
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, fmt.Sprintf("collection id = %d not found", collInfo.ID))
|
||||
|
||||
|
@ -718,14 +754,10 @@ func TestMetaTable(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
|
||||
segIdxInfo.IndexID = indexID
|
||||
mockKV.save = func(key string, value string, ts typeutil.Timestamp) error {
|
||||
mockTxnKV.save = func(key string, value string) error {
|
||||
return fmt.Errorf("save error")
|
||||
}
|
||||
ts = ftso()
|
||||
assert.Panics(t, func() { mt.AddIndex(&segIdxInfo, ts) })
|
||||
//err = mt.AddIndex(&segIdxInfo, ts)
|
||||
//assert.NotNil(t, err)
|
||||
//assert.EqualError(t, err, "save error")
|
||||
assert.Panics(t, func() { mt.AddIndex(&segIdxInfo) })
|
||||
})
|
||||
|
||||
t.Run("drop index failed", func(t *testing.T) {
|
||||
|
@ -748,17 +780,16 @@ func TestMetaTable(t *testing.T) {
|
|||
err = mt.AddCollection(collInfo, ts, idxInfo, "")
|
||||
assert.Nil(t, err)
|
||||
|
||||
ts = ftso()
|
||||
_, _, err = mt.DropIndex("abc", "abc", "abc", ts)
|
||||
_, _, err = mt.DropIndex("abc", "abc", "abc")
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, "collection name = abc not exist")
|
||||
|
||||
mt.collName2ID["abc"] = 2
|
||||
_, _, err = mt.DropIndex("abc", "abc", "abc", ts)
|
||||
_, _, err = mt.DropIndex("abc", "abc", "abc")
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, "collection name = abc not has meta")
|
||||
|
||||
_, _, err = mt.DropIndex(collInfo.Schema.Name, "abc", "abc", ts)
|
||||
_, _, err = mt.DropIndex(collInfo.Schema.Name, "abc", "abc")
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, fmt.Sprintf("collection %s doesn't have filed abc", collInfo.Schema.Name))
|
||||
|
||||
|
@ -775,8 +806,7 @@ func TestMetaTable(t *testing.T) {
|
|||
}
|
||||
mt.collID2Meta[coll.ID] = coll
|
||||
mt.indexID2Meta = make(map[int64]pb.IndexInfo)
|
||||
ts = ftso()
|
||||
idxID, isDroped, err := mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName, ts)
|
||||
idxID, isDroped, err := mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName)
|
||||
assert.Zero(t, idxID)
|
||||
assert.False(t, isDroped)
|
||||
assert.Nil(t, err)
|
||||
|
@ -789,24 +819,20 @@ func TestMetaTable(t *testing.T) {
|
|||
ts = ftso()
|
||||
err = mt.AddCollection(collInfo, ts, idxInfo, "")
|
||||
assert.Nil(t, err)
|
||||
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
mockTxnKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string) error {
|
||||
return fmt.Errorf("multi save and remove with prefix error")
|
||||
}
|
||||
ts = ftso()
|
||||
assert.Panics(t, func() { mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName, ts) })
|
||||
//_, _, err = mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName, ts)
|
||||
//assert.NotNil(t, err)
|
||||
//assert.EqualError(t, err, "multi save and remove with prefix error")
|
||||
assert.Panics(t, func() { mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName) })
|
||||
})
|
||||
|
||||
t.Run("get segment index info by id", func(t *testing.T) {
|
||||
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
|
||||
mockTxnKV.multiSave = func(kvs map[string]string) error {
|
||||
return nil
|
||||
}
|
||||
mockKV.save = func(key, value string, ts typeutil.Timestamp) error {
|
||||
mockTxnKV.save = func(key, value string) error {
|
||||
return nil
|
||||
}
|
||||
err := mt.reloadFromKV()
|
||||
|
@ -833,8 +859,7 @@ func TestMetaTable(t *testing.T) {
|
|||
IndexID: indexID,
|
||||
BuildID: buildID,
|
||||
}
|
||||
ts = ftso()
|
||||
err = mt.AddIndex(&segIdxInfo, ts)
|
||||
err = mt.AddIndex(&segIdxInfo)
|
||||
assert.Nil(t, err)
|
||||
idx, err := mt.GetSegmentIndexInfoByID(segIdxInfo.SegmentID, segIdxInfo.FieldID, idxInfo[0].IndexName)
|
||||
assert.Nil(t, err)
|
||||
|
@ -921,7 +946,7 @@ func TestMetaTable(t *testing.T) {
|
|||
}
|
||||
|
||||
mt.collName2ID["abc"] = 123
|
||||
_, _, err = mt.GetNotIndexedSegments("abc", "no-field", idx, nil, 0)
|
||||
_, _, err = mt.GetNotIndexedSegments("abc", "no-field", idx, nil)
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, "collection abc not found")
|
||||
|
||||
|
@ -941,26 +966,26 @@ func TestMetaTable(t *testing.T) {
|
|||
err = mt.AddCollection(collInfo, ts, idxInfo, "")
|
||||
assert.Nil(t, err)
|
||||
|
||||
_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, "no-field", idx, nil, 0)
|
||||
_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, "no-field", idx, nil)
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, fmt.Sprintf("collection %s doesn't have filed no-field", collInfo.Schema.Name))
|
||||
|
||||
bakMeta := mt.indexID2Meta
|
||||
mt.indexID2Meta = make(map[int64]pb.IndexInfo)
|
||||
_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil, 0)
|
||||
_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil)
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", idxInfo[0].IndexID))
|
||||
mt.indexID2Meta = bakMeta
|
||||
|
||||
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
|
||||
mockTxnKV.multiSave = func(kvs map[string]string) error {
|
||||
return fmt.Errorf("multi save error")
|
||||
}
|
||||
assert.Panics(t, func() { mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil, 0) })
|
||||
assert.Panics(t, func() { mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil) })
|
||||
//_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil)
|
||||
//assert.NotNil(t, err)
|
||||
//assert.EqualError(t, err, "multi save error")
|
||||
|
||||
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
|
||||
mockTxnKV.multiSave = func(kvs map[string]string) error {
|
||||
return nil
|
||||
}
|
||||
collInfo.PartitionIDs = nil
|
||||
|
@ -986,10 +1011,10 @@ func TestMetaTable(t *testing.T) {
|
|||
mt.indexID2Meta[anotherIdx.IndexID] = anotherIdx
|
||||
|
||||
idx.IndexName = idxInfo[0].IndexName
|
||||
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
|
||||
mockTxnKV.multiSave = func(kvs map[string]string) error {
|
||||
return fmt.Errorf("multi save error")
|
||||
}
|
||||
assert.Panics(t, func() { mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil, 0) })
|
||||
assert.Panics(t, func() { mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil) })
|
||||
//_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil)
|
||||
//assert.NotNil(t, err)
|
||||
//assert.EqualError(t, err, "multi save error")
|
||||
|
@ -1007,10 +1032,10 @@ func TestMetaTable(t *testing.T) {
|
|||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, "collection abc not found")
|
||||
|
||||
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
|
||||
mockTxnKV.multiSave = func(kvs map[string]string) error {
|
||||
return nil
|
||||
}
|
||||
mockKV.save = func(key string, value string, ts typeutil.Timestamp) error {
|
||||
mockTxnKV.save = func(key string, value string) error {
|
||||
return nil
|
||||
}
|
||||
err = mt.reloadFromKV()
|
||||
|
@ -1063,7 +1088,8 @@ func TestMetaWithTimestamp(t *testing.T) {
|
|||
skv, err := newMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, skv)
|
||||
mt, err := NewMetaTable(skv)
|
||||
txnKV := etcdkv.NewEtcdKVWithClient(etcdCli, rootPath)
|
||||
mt, err := NewMetaTable(txnKV, skv)
|
||||
assert.Nil(t, err)
|
||||
|
||||
collInfo := &pb.CollectionInfo{
|
||||
|
|
|
@ -366,8 +366,7 @@ func (c *Core) checkFlushedSegments(ctx context.Context) {
|
|||
if info.BuildID != 0 {
|
||||
info.EnableIndex = true
|
||||
}
|
||||
ts, _ := c.TSOAllocator(1)
|
||||
if err := c.MetaTable.AddIndex(&info, ts); err != nil {
|
||||
if err := c.MetaTable.AddIndex(&info); err != nil {
|
||||
log.Debug("Add index into meta table failed",
|
||||
zap.Int64("collection_id", collMeta.ID),
|
||||
zap.Int64("index_id", info.IndexID),
|
||||
|
@ -403,7 +402,7 @@ func (c *Core) getSegments(ctx context.Context, collID typeutil.UniqueID) (map[t
|
|||
}
|
||||
|
||||
func (c *Core) setDdMsgSendFlag(b bool) error {
|
||||
flag, err := c.MetaTable.client.Load(DDMsgSendPrefix, 0)
|
||||
flag, err := c.MetaTable.txn.Load(DDMsgSendPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -413,8 +412,7 @@ func (c *Core) setDdMsgSendFlag(b bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
ts, _ := c.TSOAllocator(1)
|
||||
err = c.MetaTable.client.Save(DDMsgSendPrefix, strconv.FormatBool(b), ts)
|
||||
err = c.MetaTable.txn.Save(DDMsgSendPrefix, strconv.FormatBool(b))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -876,7 +874,7 @@ func (c *Core) Init() error {
|
|||
log.Error("RootCoord, Failed to new suffixSnapshot", zap.Error(initError))
|
||||
return initError
|
||||
}
|
||||
if c.MetaTable, initError = NewMetaTable(ss); initError != nil {
|
||||
if c.MetaTable, initError = NewMetaTable(metaKV, ss); initError != nil {
|
||||
log.Error("RootCoord, Failed to new MetaTable", zap.Any("reason", initError))
|
||||
return initError
|
||||
}
|
||||
|
@ -969,14 +967,14 @@ func (c *Core) Init() error {
|
|||
|
||||
func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
|
||||
if !force {
|
||||
flag, err := c.MetaTable.client.Load(DDMsgSendPrefix, 0)
|
||||
flag, err := c.MetaTable.txn.Load(DDMsgSendPrefix)
|
||||
if err != nil || flag == "true" {
|
||||
log.Debug("No un-successful DdMsg")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
ddOpStr, err := c.MetaTable.client.Load(DDOperationPrefix, 0)
|
||||
ddOpStr, err := c.MetaTable.txn.Load(DDOperationPrefix)
|
||||
if err != nil {
|
||||
log.Debug("DdOperation key does not exist")
|
||||
return nil
|
||||
|
@ -1937,8 +1935,7 @@ func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlus
|
|||
log.Error("build index fail", zap.Int64("buildid", info.BuildID), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
ts, _ := c.TSOAllocator(1)
|
||||
err = c.MetaTable.AddIndex(&info, ts)
|
||||
err = c.MetaTable.AddIndex(&info)
|
||||
if err != nil {
|
||||
log.Error("AddIndex fail", zap.String("err", err.Error()))
|
||||
}
|
||||
|
|
|
@ -719,10 +719,10 @@ func TestRootCoord(t *testing.T) {
|
|||
core.chanTimeTick.lock.Unlock()
|
||||
|
||||
// check DD operation info
|
||||
flag, err := core.MetaTable.client.Load(DDMsgSendPrefix, 0)
|
||||
flag, err := core.MetaTable.txn.Load(DDMsgSendPrefix)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "true", flag)
|
||||
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0)
|
||||
ddOpStr, err := core.MetaTable.txn.Load(DDOperationPrefix)
|
||||
assert.Nil(t, err)
|
||||
var ddOp DdOperation
|
||||
err = DecodeDdOperation(ddOpStr, &ddOp)
|
||||
|
@ -889,10 +889,10 @@ func TestRootCoord(t *testing.T) {
|
|||
assert.Equal(t, collName, pnm.GetCollArray()[0])
|
||||
|
||||
// check DD operation info
|
||||
flag, err := core.MetaTable.client.Load(DDMsgSendPrefix, 0)
|
||||
flag, err := core.MetaTable.txn.Load(DDMsgSendPrefix)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "true", flag)
|
||||
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0)
|
||||
ddOpStr, err := core.MetaTable.txn.Load(DDOperationPrefix)
|
||||
assert.Nil(t, err)
|
||||
var ddOp DdOperation
|
||||
err = DecodeDdOperation(ddOpStr, &ddOp)
|
||||
|
@ -1224,10 +1224,10 @@ func TestRootCoord(t *testing.T) {
|
|||
assert.Equal(t, collName, pnm.GetCollArray()[1])
|
||||
|
||||
// check DD operation info
|
||||
flag, err := core.MetaTable.client.Load(DDMsgSendPrefix, 0)
|
||||
flag, err := core.MetaTable.txn.Load(DDMsgSendPrefix)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "true", flag)
|
||||
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0)
|
||||
ddOpStr, err := core.MetaTable.txn.Load(DDOperationPrefix)
|
||||
assert.Nil(t, err)
|
||||
var ddOp DdOperation
|
||||
err = DecodeDdOperation(ddOpStr, &ddOp)
|
||||
|
@ -1314,10 +1314,10 @@ func TestRootCoord(t *testing.T) {
|
|||
assert.Equal(t, collName, collArray[2])
|
||||
|
||||
// check DD operation info
|
||||
flag, err := core.MetaTable.client.Load(DDMsgSendPrefix, 0)
|
||||
flag, err := core.MetaTable.txn.Load(DDMsgSendPrefix)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "true", flag)
|
||||
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0)
|
||||
ddOpStr, err := core.MetaTable.txn.Load(DDOperationPrefix)
|
||||
assert.Nil(t, err)
|
||||
var ddOp DdOperation
|
||||
err = DecodeDdOperation(ddOpStr, &ddOp)
|
||||
|
|
|
@ -855,7 +855,7 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
segIDs, field, err := t.core.MetaTable.GetNotIndexedSegments(t.Req.CollectionName, t.Req.FieldName, idxInfo, flushedSegs, t.Req.Base.GetTimestamp())
|
||||
segIDs, field, err := t.core.MetaTable.GetNotIndexedSegments(t.Req.CollectionName, t.Req.FieldName, idxInfo, flushedSegs)
|
||||
if err != nil {
|
||||
log.Debug("RootCoord CreateIndexReqTask metaTable.GetNotIndexedSegments", zap.Error(err))
|
||||
return err
|
||||
|
@ -880,8 +880,7 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
|
|||
if info.BuildID != 0 {
|
||||
info.EnableIndex = true
|
||||
}
|
||||
ts, _ := t.core.TSOAllocator(1)
|
||||
if err := t.core.MetaTable.AddIndex(&info, ts); err != nil {
|
||||
if err := t.core.MetaTable.AddIndex(&info); err != nil {
|
||||
log.Debug("Add index into meta table failed", zap.Int64("collection_id", collMeta.ID), zap.Int64("index_id", info.IndexID), zap.Int64("build_id", info.BuildID), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
@ -958,8 +957,7 @@ func (t *DropIndexReqTask) Execute(ctx context.Context) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ts, _ := t.core.TSOAllocator(1)
|
||||
_, _, err = t.core.MetaTable.DropIndex(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName, ts)
|
||||
_, _, err = t.core.MetaTable.DropIndex(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue