query master service's with timestamp (#5275)

let meta support snapshot
so collection could query meta with timestamp

Resolves: #5219

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>
pull/5212/head
neza2017 2021-05-18 14:18:02 +08:00 committed by GitHub
parent c775744136
commit 49f6542b1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1304 additions and 367 deletions

View File

@ -360,7 +360,7 @@ func TestGrpcService(t *testing.T) {
})
t.Run("describe collection", func(t *testing.T) {
collMeta, err := core.MetaTable.GetCollectionByName("testColl")
collMeta, err := core.MetaTable.GetCollectionByName("testColl", 0)
assert.Nil(t, err)
req := &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
@ -411,10 +411,10 @@ func TestGrpcService(t *testing.T) {
status, err := cli.CreatePartition(ctx, req)
assert.Nil(t, err)
assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_Success)
collMeta, err := core.MetaTable.GetCollectionByName("testColl")
collMeta, err := core.MetaTable.GetCollectionByName("testColl", 0)
assert.Nil(t, err)
assert.Equal(t, len(collMeta.PartitionIDs), 2)
partMeta, err := core.MetaTable.GetPartitionByID(collMeta.PartitionIDs[1])
partMeta, err := core.MetaTable.GetPartitionByID(1, collMeta.PartitionIDs[1], 0)
assert.Nil(t, err)
assert.Equal(t, partMeta.PartitionName, "testPartition")
@ -440,7 +440,7 @@ func TestGrpcService(t *testing.T) {
})
t.Run("show partition", func(t *testing.T) {
coll, err := core.MetaTable.GetCollectionByName("testColl")
coll, err := core.MetaTable.GetCollectionByName("testColl", 0)
assert.Nil(t, err)
req := &milvuspb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
@ -461,10 +461,10 @@ func TestGrpcService(t *testing.T) {
})
t.Run("show segment", func(t *testing.T) {
coll, err := core.MetaTable.GetCollectionByName("testColl")
coll, err := core.MetaTable.GetCollectionByName("testColl", 0)
assert.Nil(t, err)
partID := coll.PartitionIDs[1]
part, err := core.MetaTable.GetPartitionByID(partID)
part, err := core.MetaTable.GetPartitionByID(1, partID, 0)
assert.Nil(t, err)
assert.Zero(t, len(part.SegmentIDs))
seg := &datapb.SegmentInfo{
@ -474,7 +474,7 @@ func TestGrpcService(t *testing.T) {
}
core.DataServiceSegmentChan <- seg
time.Sleep(time.Millisecond * 100)
part, err = core.MetaTable.GetPartitionByID(partID)
part, err = core.MetaTable.GetPartitionByID(1, partID, 0)
assert.Nil(t, err)
assert.Equal(t, len(part.SegmentIDs), 1)
@ -513,13 +513,13 @@ func TestGrpcService(t *testing.T) {
},
},
}
collMeta, err := core.MetaTable.GetCollectionByName("testColl")
collMeta, err := core.MetaTable.GetCollectionByName("testColl", 0)
assert.Nil(t, err)
assert.Equal(t, len(collMeta.FieldIndexes), 0)
rsp, err := cli.CreateIndex(ctx, req)
assert.Nil(t, err)
assert.Equal(t, rsp.ErrorCode, commonpb.ErrorCode_Success)
collMeta, err = core.MetaTable.GetCollectionByName("testColl")
collMeta, err = core.MetaTable.GetCollectionByName("testColl", 0)
assert.Nil(t, err)
assert.Equal(t, len(collMeta.FieldIndexes), 1)
@ -535,7 +535,7 @@ func TestGrpcService(t *testing.T) {
})
t.Run("describe segment", func(t *testing.T) {
coll, err := core.MetaTable.GetCollectionByName("testColl")
coll, err := core.MetaTable.GetCollectionByName("testColl", 0)
assert.Nil(t, err)
req := &milvuspb.DescribeSegmentRequest{
@ -575,10 +575,10 @@ func TestGrpcService(t *testing.T) {
})
t.Run("flush segment", func(t *testing.T) {
coll, err := core.MetaTable.GetCollectionByName("testColl")
coll, err := core.MetaTable.GetCollectionByName("testColl", 0)
assert.Nil(t, err)
partID := coll.PartitionIDs[1]
part, err := core.MetaTable.GetPartitionByID(partID)
part, err := core.MetaTable.GetPartitionByID(1, partID, 0)
assert.Nil(t, err)
assert.Equal(t, len(part.SegmentIDs), 1)
seg := &datapb.SegmentInfo{
@ -588,7 +588,7 @@ func TestGrpcService(t *testing.T) {
}
core.DataServiceSegmentChan <- seg
time.Sleep(time.Millisecond * 100)
part, err = core.MetaTable.GetPartitionByID(partID)
part, err = core.MetaTable.GetPartitionByID(1, partID, 0)
assert.Nil(t, err)
assert.Equal(t, len(part.SegmentIDs), 2)
core.DataNodeSegmentFlushCompletedChan <- 1001
@ -656,10 +656,10 @@ func TestGrpcService(t *testing.T) {
status, err := cli.DropPartition(ctx, req)
assert.Nil(t, err)
assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_Success)
collMeta, err := core.MetaTable.GetCollectionByName("testColl")
collMeta, err := core.MetaTable.GetCollectionByName("testColl", 0)
assert.Nil(t, err)
assert.Equal(t, len(collMeta.PartitionIDs), 1)
partMeta, err := core.MetaTable.GetPartitionByID(collMeta.PartitionIDs[0])
partMeta, err := core.MetaTable.GetPartitionByID(1, collMeta.PartitionIDs[0], 0)
assert.Nil(t, err)
assert.Equal(t, partMeta.PartitionName, cms.Params.DefaultPartitionName)
assert.Equal(t, 2, len(collectionMetaCache))

View File

@ -11,6 +11,10 @@
package kv
import (
"github.com/milvus-io/milvus/internal/util/typeutil"
)
type BaseKV interface {
Load(key string) (string, error)
MultiLoad(keys []string) ([]string, error)
@ -30,3 +34,11 @@ type TxnKV interface {
MultiRemoveWithPrefix(keys []string) error
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error
}
type SnapShotKV interface {
Save(key, value string) (typeutil.Timestamp, error)
Load(key string, ts typeutil.Timestamp) (string, error)
MultiSave(kvs map[string]string) (typeutil.Timestamp, error)
LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error)
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) (typeutil.Timestamp, error)
}

View File

@ -90,7 +90,6 @@ type Core struct {
cancel context.CancelFunc
etcdCli *clientv3.Client
kvBase *etcdkv.EtcdKV
metaKV *etcdkv.EtcdKV
//setMsgStreams, receive time tick from proxy service time tick channel
ProxyTimeTickChan chan typeutil.Timestamp
@ -191,9 +190,6 @@ func (c *Core) checkInit() error {
if c.etcdCli == nil {
return fmt.Errorf("etcdCli is nil")
}
if c.metaKV == nil {
return fmt.Errorf("metaKV is nil")
}
if c.kvBase == nil {
return fmt.Errorf("kvBase is nil")
}
@ -307,7 +303,7 @@ func (c *Core) startDataServiceSegmentLoop() {
}
if seg == nil {
log.Warn("segment from data service is nil")
} else if err := c.MetaTable.AddSegment(seg); err != nil {
} else if _, err := c.MetaTable.AddSegment(seg); err != nil {
//what if master add segment failed, but data service success?
log.Warn("add segment info meta table failed ", zap.String("error", err.Error()))
} else {
@ -387,7 +383,7 @@ func (c *Core) tsLoop() {
}
func (c *Core) setDdMsgSendFlag(b bool) error {
flag, err := c.MetaTable.client.Load(DDMsgSendPrefix)
flag, err := c.MetaTable.client.Load(DDMsgSendPrefix, 0)
if err != nil {
return err
}
@ -398,9 +394,11 @@ func (c *Core) setDdMsgSendFlag(b bool) error {
}
if b {
return c.MetaTable.client.Save(DDMsgSendPrefix, "true")
_, err = c.MetaTable.client.Save(DDMsgSendPrefix, "true")
return err
}
return c.MetaTable.client.Save(DDMsgSendPrefix, "false")
_, err = c.MetaTable.client.Save(DDMsgSendPrefix, "false")
return err
}
func (c *Core) setMsgStreams() error {
@ -808,7 +806,7 @@ func (c *Core) BuildIndex(segID typeutil.UniqueID, field *schemapb.FieldSchema,
BuildID: bldID,
EnableIndex: enableIdx,
}
err = c.MetaTable.AddIndex(&seg)
_, err = c.MetaTable.AddIndex(&seg)
return err
}
@ -819,8 +817,23 @@ func (c *Core) Init() error {
if c.etcdCli, initError = clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second}); initError != nil {
return initError
}
c.metaKV = etcdkv.NewEtcdKV(c.etcdCli, Params.MetaRootPath)
if c.MetaTable, initError = NewMetaTable(c.metaKV); initError != nil {
tsAlloc := func() typeutil.Timestamp {
for {
var ts typeutil.Timestamp
var err error
if ts, err = c.tsoAllocator(1); err == nil {
return ts
}
time.Sleep(100 * time.Millisecond)
log.Debug("alloc time stamp error", zap.Error(err))
}
}
var ms *metaSnapshot
ms, initError = newMetaSnapshot(c.etcdCli, Params.MetaRootPath, TimestampPrefix, 1024, tsAlloc)
if initError != nil {
return initError
}
if c.MetaTable, initError = NewMetaTable(ms); initError != nil {
return initError
}
c.kvBase = etcdkv.NewEtcdKV(c.etcdCli, Params.KvRootPath)
@ -876,13 +889,13 @@ func (c *Core) Init() error {
}
func (c *Core) reSendDdMsg(ctx context.Context) error {
flag, err := c.MetaTable.client.Load(DDMsgSendPrefix)
flag, err := c.MetaTable.client.Load(DDMsgSendPrefix, 0)
if err != nil || flag == "true" {
log.Debug("No un-successful DdMsg")
return nil
}
ddOpStr, err := c.MetaTable.client.Load(DDOperationPrefix)
ddOpStr, err := c.MetaTable.client.Load(DDOperationPrefix, 0)
if err != nil {
log.Debug("DdOperation key does not exist")
return nil
@ -1638,27 +1651,29 @@ func (c *Core) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*maste
}
func (c *Core) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := c.metaKV.Grant(5)
respID, err := c.etcdCli.Grant(c.ctx, 5)
if err != nil {
fmt.Printf("grant error %s\n", err)
return nil, err
}
c.session.NodeName = nodeName
c.session.IP = ip
c.session.LeaseID = respID
c.session.LeaseID = respID.ID
sessionJSON, err := json.Marshal(c.session)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(c.ctx, RequestTimeout)
defer cancel()
err = c.metaKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID)
_, err = c.etcdCli.Put(ctx, fmt.Sprintf("%s/node/%s", Params.MetaRootPath, nodeName), string(sessionJSON), clientv3.WithLease(respID.ID))
if err != nil {
fmt.Printf("put lease error %s\n", err)
return nil, err
}
ch, err := c.metaKV.KeepAlive(respID)
ch, err := c.etcdCli.KeepAlive(c.ctx, respID.ID)
if err != nil {
fmt.Printf("keep alive error %s\n", err)
return nil, err

View File

@ -358,7 +358,7 @@ func TestMasterService(t *testing.T) {
createMsg, ok := (msg.Msgs[0]).(*msgstream.CreateCollectionMsg)
assert.True(t, ok)
createMeta, err := core.MetaTable.GetCollectionByName(collName)
createMeta, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
assert.Equal(t, createMeta.ID, createMsg.CollectionID)
assert.Equal(t, 1, len(createMeta.PartitionIDs))
@ -414,15 +414,15 @@ func TestMasterService(t *testing.T) {
assert.True(t, ok)
createMsg, ok = (msg.Msgs[0]).(*msgstream.CreateCollectionMsg)
assert.True(t, ok)
createMeta, err = core.MetaTable.GetCollectionByName("testColl-again")
createMeta, err = core.MetaTable.GetCollectionByName("testColl-again", 0)
assert.Nil(t, err)
assert.Equal(t, createMeta.ID, createMsg.CollectionID)
// check DD operation info
flag, err := core.MetaTable.client.Load(DDMsgSendPrefix)
flag, err := core.MetaTable.client.Load(DDMsgSendPrefix, 0)
assert.Nil(t, err)
assert.Equal(t, "true", flag)
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix)
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0)
assert.Nil(t, err)
var ddOp DdOperation
err = json.Unmarshal([]byte(ddOpStr), &ddOp)
@ -490,7 +490,7 @@ func TestMasterService(t *testing.T) {
})
t.Run("describe collection", func(t *testing.T) {
collMeta, err := core.MetaTable.GetCollectionByName(collName)
collMeta, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
req := &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
@ -544,10 +544,10 @@ func TestMasterService(t *testing.T) {
status, err := core.CreatePartition(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
collMeta, err := core.MetaTable.GetCollectionByName(collName)
collMeta, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
assert.Equal(t, 2, len(collMeta.PartitionIDs))
partMeta, err := core.MetaTable.GetPartitionByID(collMeta.PartitionIDs[1])
partMeta, err := core.MetaTable.GetPartitionByID(1, collMeta.PartitionIDs[1], 0)
assert.Nil(t, err)
assert.Equal(t, partName, partMeta.PartitionName)
@ -563,10 +563,10 @@ func TestMasterService(t *testing.T) {
assert.Equal(t, collName, pm.GetCollArray()[0])
// check DD operation info
flag, err := core.MetaTable.client.Load(DDMsgSendPrefix)
flag, err := core.MetaTable.client.Load(DDMsgSendPrefix, 0)
assert.Nil(t, err)
assert.Equal(t, "true", flag)
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix)
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0)
assert.Nil(t, err)
var ddOp DdOperation
err = json.Unmarshal([]byte(ddOpStr), &ddOp)
@ -599,7 +599,7 @@ func TestMasterService(t *testing.T) {
})
t.Run("show partition", func(t *testing.T) {
coll, err := core.MetaTable.GetCollectionByName(collName)
coll, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
req := &milvuspb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
@ -620,10 +620,10 @@ func TestMasterService(t *testing.T) {
})
t.Run("show segment", func(t *testing.T) {
coll, err := core.MetaTable.GetCollectionByName(collName)
coll, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
partID := coll.PartitionIDs[1]
part, err := core.MetaTable.GetPartitionByID(partID)
part, err := core.MetaTable.GetPartitionByID(1, partID, 0)
assert.Nil(t, err)
assert.Zero(t, len(part.SegmentIDs))
@ -656,7 +656,7 @@ func TestMasterService(t *testing.T) {
assert.Nil(t, err)
time.Sleep(time.Second)
part, err = core.MetaTable.GetPartitionByID(partID)
part, err = core.MetaTable.GetPartitionByID(1, partID, 0)
assert.Nil(t, err)
assert.Equal(t, 1, len(part.SegmentIDs))
@ -695,7 +695,7 @@ func TestMasterService(t *testing.T) {
},
},
}
collMeta, err := core.MetaTable.GetCollectionByName(collName)
collMeta, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
assert.Equal(t, 0, len(collMeta.FieldIndexes))
@ -706,7 +706,7 @@ func TestMasterService(t *testing.T) {
files := im.getFileArray()
assert.Equal(t, 3, len(files))
assert.ElementsMatch(t, files, []string{"file0-100", "file1-100", "file2-100"})
collMeta, err = core.MetaTable.GetCollectionByName(collName)
collMeta, err = core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
assert.Equal(t, 1, len(collMeta.FieldIndexes))
idxMeta, err := core.MetaTable.GetIndexByID(collMeta.FieldIndexes[0].IndexID)
@ -720,7 +720,7 @@ func TestMasterService(t *testing.T) {
})
t.Run("describe segment", func(t *testing.T) {
coll, err := core.MetaTable.GetCollectionByName(collName)
coll, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
req := &milvuspb.DescribeSegmentRequest{
@ -780,10 +780,10 @@ func TestMasterService(t *testing.T) {
})
t.Run("flush segment", func(t *testing.T) {
coll, err := core.MetaTable.GetCollectionByName(collName)
coll, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
partID := coll.PartitionIDs[1]
part, err := core.MetaTable.GetPartitionByID(partID)
part, err := core.MetaTable.GetPartitionByID(1, partID, 0)
assert.Nil(t, err)
assert.Equal(t, 1, len(part.SegmentIDs))
@ -816,7 +816,7 @@ func TestMasterService(t *testing.T) {
assert.Nil(t, err)
time.Sleep(time.Second)
part, err = core.MetaTable.GetPartitionByID(partID)
part, err = core.MetaTable.GetPartitionByID(1, partID, 0)
assert.Nil(t, err)
assert.Equal(t, 2, len(part.SegmentIDs))
@ -875,7 +875,7 @@ func TestMasterService(t *testing.T) {
},
}
collMeta, err := core.MetaTable.GetCollectionByName(collName)
collMeta, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
assert.Equal(t, 1, len(collMeta.FieldIndexes))
oldIdx := collMeta.FieldIndexes[0].IndexID
@ -885,7 +885,7 @@ func TestMasterService(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, rsp.ErrorCode)
time.Sleep(time.Second)
collMeta, err = core.MetaTable.GetCollectionByName(collName)
collMeta, err = core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
assert.Equal(t, 2, len(collMeta.FieldIndexes))
assert.Equal(t, oldIdx, collMeta.FieldIndexes[0].IndexID)
@ -943,16 +943,16 @@ func TestMasterService(t *testing.T) {
CollectionName: collName,
PartitionName: partName,
}
collMeta, err := core.MetaTable.GetCollectionByName(collName)
collMeta, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
dropPartID := collMeta.PartitionIDs[1]
status, err := core.DropPartition(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
collMeta, err = core.MetaTable.GetCollectionByName(collName)
collMeta, err = core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
assert.Equal(t, 1, len(collMeta.PartitionIDs))
partMeta, err := core.MetaTable.GetPartitionByID(collMeta.PartitionIDs[0])
partMeta, err := core.MetaTable.GetPartitionByID(1, collMeta.PartitionIDs[0], 0)
assert.Nil(t, err)
assert.Equal(t, Params.DefaultPartitionName, partMeta.PartitionName)
@ -968,10 +968,10 @@ func TestMasterService(t *testing.T) {
assert.Equal(t, collName, pm.GetCollArray()[1])
// check DD operation info
flag, err := core.MetaTable.client.Load(DDMsgSendPrefix)
flag, err := core.MetaTable.client.Load(DDMsgSendPrefix, 0)
assert.Nil(t, err)
assert.Equal(t, "true", flag)
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix)
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0)
assert.Nil(t, err)
var ddOp DdOperation
err = json.Unmarshal([]byte(ddOpStr), &ddOp)
@ -996,7 +996,7 @@ func TestMasterService(t *testing.T) {
DbName: dbName,
CollectionName: collName,
}
collMeta, err := core.MetaTable.GetCollectionByName(collName)
collMeta, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
status, err := core.DropCollection(ctx, req)
assert.Nil(t, err)
@ -1042,10 +1042,10 @@ func TestMasterService(t *testing.T) {
assert.Equal(t, collName, collArray[2])
// check DD operation info
flag, err := core.MetaTable.client.Load(DDMsgSendPrefix)
flag, err := core.MetaTable.client.Load(DDMsgSendPrefix, 0)
assert.Nil(t, err)
assert.Equal(t, "true", flag)
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix)
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0)
assert.Nil(t, err)
var ddOp DdOperation
err = json.Unmarshal([]byte(ddOpStr), &ddOp)
@ -1657,10 +1657,6 @@ func TestCheckInit(t *testing.T) {
err = c.checkInit()
assert.NotNil(t, err)
c.metaKV = &etcdkv.EtcdKV{}
err = c.checkInit()
assert.NotNil(t, err)
c.kvBase = &etcdkv.EtcdKV{}
err = c.checkInit()
assert.NotNil(t, err)

View File

@ -0,0 +1,395 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package masterservice
import (
"context"
"fmt"
"path"
"strconv"
"sync"
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)
const (
RequestTimeout = 10 * time.Second
)
type rtPair struct {
rev int64
ts typeutil.Timestamp
}
type metaSnapshot struct {
cli *clientv3.Client
root string
tsKey string
lock sync.RWMutex
timeAllactor func() typeutil.Timestamp
ts2Rev []rtPair
minPos int
maxPos int
numTs int
}
func newMetaSnapshot(cli *clientv3.Client, root, tsKey string, bufSize int, timeAllactor func() typeutil.Timestamp) (*metaSnapshot, error) {
if bufSize <= 0 {
bufSize = 1024
}
ms := &metaSnapshot{
cli: cli,
root: root,
tsKey: tsKey,
lock: sync.RWMutex{},
timeAllactor: timeAllactor,
ts2Rev: make([]rtPair, bufSize),
minPos: 0,
maxPos: 0,
numTs: 0,
}
if err := ms.loadTs(); err != nil {
return nil, err
}
return ms, nil
}
func (ms *metaSnapshot) loadTs() error {
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
key := path.Join(ms.root, ms.tsKey)
resp, err := ms.cli.Get(ctx, key)
if err != nil {
return err
}
if len(resp.Kvs) <= 0 {
return nil
}
version := resp.Kvs[0].Version
revision := resp.Kvs[0].ModRevision
strTs := string(resp.Kvs[0].Value)
ts, err := strconv.ParseUint(strTs, 10, 64)
if err != nil {
return err
}
log.Info("load last ts", zap.Int64("version", version), zap.Int64("revision", revision))
ms.initTs(revision, ts)
for version--; version > 0; version-- {
if ms.numTs == len(ms.ts2Rev) {
break
}
revision--
resp, err = ms.cli.Get(ctx, key, clientv3.WithRev(revision))
if err != nil {
return err
}
if len(resp.Kvs) <= 0 {
return nil
}
curVer := resp.Kvs[0].Version
curRev := resp.Kvs[0].ModRevision
if curVer > version {
return nil
}
strTs := string(resp.Kvs[0].Value)
curTs, err := strconv.ParseUint(strTs, 10, 64)
if err != nil {
return err
}
if curTs >= ts {
return fmt.Errorf("timestamp go back, curTs=%d,ts=%d", curTs, ts)
}
ms.initTs(curRev, curTs)
ts = curTs
revision = curRev
}
return nil
}
func (ms *metaSnapshot) maxTs() typeutil.Timestamp {
return ms.ts2Rev[ms.maxPos].ts
}
func (ms *metaSnapshot) minTs() typeutil.Timestamp {
return ms.ts2Rev[ms.minPos].ts
}
func (ms *metaSnapshot) initTs(rev int64, ts typeutil.Timestamp) {
log.Debug("init meta snapshot ts", zap.Int64("rev", rev), zap.Uint64("ts", ts))
if ms.numTs == 0 {
ms.maxPos = len(ms.ts2Rev) - 1
ms.minPos = len(ms.ts2Rev) - 1
ms.numTs = 1
ms.ts2Rev[ms.maxPos].rev = rev
ms.ts2Rev[ms.maxPos].ts = ts
} else if ms.numTs < len(ms.ts2Rev) {
ms.minPos--
ms.numTs++
ms.ts2Rev[ms.minPos].rev = rev
ms.ts2Rev[ms.minPos].ts = ts
}
}
func (ms *metaSnapshot) putTs(rev int64, ts typeutil.Timestamp) {
log.Debug("put meta snapshto ts", zap.Int64("rev", rev), zap.Uint64("ts", ts))
ms.maxPos++
if ms.maxPos == len(ms.ts2Rev) {
ms.maxPos = 0
}
ms.ts2Rev[ms.maxPos].rev = rev
ms.ts2Rev[ms.maxPos].ts = ts
if ms.numTs < len(ms.ts2Rev) {
ms.numTs++
} else {
ms.minPos++
if ms.minPos == len(ms.ts2Rev) {
ms.minPos = 0
}
}
}
func (ms *metaSnapshot) searchOnCache(ts typeutil.Timestamp, start, length int) int64 {
if length == 1 {
return ms.ts2Rev[start].rev
}
begin := start
end := begin + length
mid := (begin + end) / 2
for {
if ms.ts2Rev[mid].ts == ts {
return ms.ts2Rev[mid].rev
}
if mid == begin {
if ms.ts2Rev[mid].ts < ts || mid == start {
return ms.ts2Rev[mid].rev
}
return ms.ts2Rev[mid-1].rev
}
if ms.ts2Rev[mid].ts > ts {
end = mid
} else if ms.ts2Rev[mid].ts < ts {
begin = mid + 1
}
mid = (begin + end) / 2
}
}
func (ms *metaSnapshot) getRevOnCache(ts typeutil.Timestamp) int64 {
if ms.numTs == 0 {
return 0
}
if ts >= ms.ts2Rev[ms.maxPos].ts {
return ms.ts2Rev[ms.maxPos].rev
}
if ts < ms.ts2Rev[ms.minPos].ts {
return 0
}
if ms.maxPos > ms.minPos {
return ms.searchOnCache(ts, ms.minPos, ms.maxPos-ms.minPos+1)
}
topVal := ms.ts2Rev[len(ms.ts2Rev)-1]
botVal := ms.ts2Rev[0]
minVal := ms.ts2Rev[ms.minPos]
maxVal := ms.ts2Rev[ms.maxPos]
if ts >= topVal.ts && ts < botVal.ts {
return topVal.rev
} else if ts >= minVal.ts && ts < topVal.ts {
return ms.searchOnCache(ts, ms.minPos, len(ms.ts2Rev)-ms.minPos)
} else if ts >= botVal.ts && ts < maxVal.ts {
return ms.searchOnCache(ts, 0, ms.maxPos+1)
}
return 0
}
func (ms *metaSnapshot) getRevOnEtcd(ts typeutil.Timestamp, rev int64) int64 {
if rev < 2 {
return 0
}
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
for rev--; rev >= 2; rev-- {
resp, err := ms.cli.Get(ctx, path.Join(ms.root, ms.tsKey), clientv3.WithRev(rev))
if err != nil {
log.Debug("get ts from etcd failed", zap.Error(err))
return 0
}
if len(resp.Kvs) <= 0 {
return 0
}
rev = resp.Kvs[0].ModRevision
curTs, err := strconv.ParseUint(string(resp.Kvs[0].Value), 10, 64)
if err != nil {
log.Debug("parse timestam error", zap.String("input", string(resp.Kvs[0].Value)), zap.Error(err))
return 0
}
if curTs <= ts {
return rev
}
}
return 0
}
func (ms *metaSnapshot) getRev(ts typeutil.Timestamp) (int64, error) {
rev := ms.getRevOnCache(ts)
if rev > 0 {
return rev, nil
}
rev = ms.ts2Rev[ms.minPos].rev
rev = ms.getRevOnEtcd(ts, rev)
if rev > 0 {
return rev, nil
}
return 0, fmt.Errorf("can't find revision on ts=%d", ts)
}
func (ms *metaSnapshot) Save(key, value string) (typeutil.Timestamp, error) {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
ts := ms.timeAllactor()
strTs := strconv.FormatInt(int64(ts), 10)
resp, err := ms.cli.Txn(ctx).If().Then(
clientv3.OpPut(path.Join(ms.root, key), value),
clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs),
).Commit()
if err != nil {
return 0, err
}
ms.putTs(resp.Header.Revision, ts)
return ts, nil
}
func (ms *metaSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) {
ms.lock.RLock()
defer ms.lock.RUnlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
var resp *clientv3.GetResponse
var err error
var rev int64
if ts == 0 {
resp, err = ms.cli.Get(ctx, path.Join(ms.root, key))
if err != nil {
return "", err
}
} else {
rev, err = ms.getRev(ts)
if err != nil {
return "", err
}
resp, err = ms.cli.Get(ctx, path.Join(ms.root, key), clientv3.WithRev(rev))
if err != nil {
return "", err
}
}
if len(resp.Kvs) == 0 {
return "", fmt.Errorf("there is no value on key = %s, ts = %d", key, ts)
}
return string(resp.Kvs[0].Value), nil
}
func (ms *metaSnapshot) MultiSave(kvs map[string]string) (typeutil.Timestamp, error) {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
ts := ms.timeAllactor()
strTs := strconv.FormatInt(int64(ts), 10)
ops := make([]clientv3.Op, 0, len(kvs)+1)
for key, value := range kvs {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value))
}
ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs))
resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit()
if err != nil {
return 0, err
}
ms.putTs(resp.Header.Revision, ts)
return ts, nil
}
func (ms *metaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
ms.lock.RLock()
defer ms.lock.RUnlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
var resp *clientv3.GetResponse
var err error
var rev int64
if ts == 0 {
resp, err = ms.cli.Get(ctx, path.Join(ms.root, key), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, nil, err
}
} else {
rev, err = ms.getRev(ts)
if err != nil {
return nil, nil, err
}
resp, err = ms.cli.Get(ctx, path.Join(ms.root, key), clientv3.WithPrefix(), clientv3.WithRev(rev), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, nil, err
}
}
keys := make([]string, 0, len(resp.Kvs))
values := make([]string, 0, len(resp.Kvs))
tk := path.Join(ms.root, "k")
prefixLen := len(tk) - 1
for _, kv := range resp.Kvs {
tk = string(kv.Key)
tk = tk[prefixLen:]
keys = append(keys, tk)
values = append(values, string(kv.Value))
}
return keys, values, nil
}
func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) (typeutil.Timestamp, error) {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
ts := ms.timeAllactor()
strTs := strconv.FormatInt(int64(ts), 10)
ops := make([]clientv3.Op, 0, len(saves)+len(removals)+1)
for key, value := range saves {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value))
}
for _, key := range removals {
ops = append(ops, clientv3.OpDelete(path.Join(ms.root, key)))
}
ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs))
resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit()
if err != nil {
return 0, err
}
ms.putTs(resp.Header.Revision, ts)
return ts, nil
}

View File

@ -0,0 +1,398 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package masterservice
import (
"context"
"fmt"
"math/rand"
"path"
"testing"
"time"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/clientv3"
)
func TestMetaSnapshot(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
etcdAddr := Params.EtcdAddress
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
defer etcdCli.Close()
var vtso typeutil.Timestamp
ftso := func() typeutil.Timestamp {
return vtso
}
ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 4, ftso)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 8; i++ {
vtso = typeutil.Timestamp(100 + i)
ts, err := ms.Save("abc", fmt.Sprintf("value-%d", i))
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
_, err = etcdCli.Put(context.Background(), "other", fmt.Sprintf("other-%d", i))
assert.Nil(t, err)
}
ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 4, ftso)
assert.Nil(t, err)
assert.NotNil(t, ms)
}
func TestSearchOnCache(t *testing.T) {
ms := &metaSnapshot{}
for i := 0; i < 8; i++ {
ms.ts2Rev = append(ms.ts2Rev,
rtPair{
rev: int64(i * 2),
ts: typeutil.Timestamp(i * 2),
})
}
rev := ms.searchOnCache(9, 0, 8)
assert.Equal(t, int64(8), rev)
rev = ms.searchOnCache(1, 0, 2)
assert.Equal(t, int64(0), rev)
rev = ms.searchOnCache(1, 0, 8)
assert.Equal(t, int64(0), rev)
rev = ms.searchOnCache(14, 0, 8)
assert.Equal(t, int64(14), rev)
rev = ms.searchOnCache(0, 0, 8)
assert.Equal(t, int64(0), rev)
}
func TestGetRevOnCache(t *testing.T) {
ms := &metaSnapshot{}
ms.ts2Rev = make([]rtPair, 7)
ms.initTs(7, 16)
ms.initTs(6, 14)
ms.initTs(5, 12)
ms.initTs(4, 10)
var rev int64
rev = ms.getRevOnCache(17)
assert.Equal(t, int64(7), rev)
rev = ms.getRevOnCache(9)
assert.Equal(t, int64(0), rev)
rev = ms.getRevOnCache(10)
assert.Equal(t, int64(4), rev)
rev = ms.getRevOnCache(16)
assert.Equal(t, int64(7), rev)
rev = ms.getRevOnCache(15)
assert.Equal(t, int64(6), rev)
rev = ms.getRevOnCache(12)
assert.Equal(t, int64(5), rev)
ms.initTs(3, 8)
ms.initTs(2, 6)
assert.Equal(t, ms.maxPos, 6)
assert.Equal(t, ms.minPos, 1)
rev = ms.getRevOnCache(17)
assert.Equal(t, int64(7), rev)
rev = ms.getRevOnCache(9)
assert.Equal(t, int64(3), rev)
rev = ms.getRevOnCache(10)
assert.Equal(t, int64(4), rev)
rev = ms.getRevOnCache(16)
assert.Equal(t, int64(7), rev)
rev = ms.getRevOnCache(15)
assert.Equal(t, int64(6), rev)
rev = ms.getRevOnCache(12)
assert.Equal(t, int64(5), rev)
rev = ms.getRevOnCache(5)
assert.Equal(t, int64(0), rev)
ms.putTs(8, 18)
assert.Equal(t, ms.maxPos, 0)
assert.Equal(t, ms.minPos, 1)
for rev = 2; rev <= 7; rev++ {
ts := ms.getRevOnCache(typeutil.Timestamp(rev*2 + 3))
assert.Equal(t, rev, ts)
}
ms.putTs(9, 20)
assert.Equal(t, ms.maxPos, 1)
assert.Equal(t, ms.minPos, 2)
assert.Equal(t, ms.numTs, 7)
curMax := ms.maxPos
curMin := ms.minPos
for i := 10; i < 20; i++ {
ms.putTs(int64(i), typeutil.Timestamp(i*2+2))
curMax++
curMin++
if curMax == len(ms.ts2Rev) {
curMax = 0
}
if curMin == len(ms.ts2Rev) {
curMin = 0
}
assert.Equal(t, curMax, ms.maxPos)
assert.Equal(t, curMin, ms.minPos)
}
for i := 13; i < 20; i++ {
rev = ms.getRevOnCache(typeutil.Timestamp(i*2 + 2))
assert.Equal(t, int64(i), rev)
rev = ms.getRevOnCache(typeutil.Timestamp(i*2 + 3))
assert.Equal(t, int64(i), rev)
}
rev = ms.getRevOnCache(27)
assert.Zero(t, rev)
}
func TestGetRevOnEtcd(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
etcdAddr := Params.EtcdAddress
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
key := path.Join(rootPath, tsKey)
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
defer etcdCli.Close()
ms := metaSnapshot{
cli: etcdCli,
root: rootPath,
tsKey: tsKey,
}
resp, err := etcdCli.Put(ctx, key, "100")
assert.Nil(t, err)
revList := []int64{}
tsList := []typeutil.Timestamp{}
revList = append(revList, resp.Header.Revision)
tsList = append(tsList, 100)
for i := 110; i < 200; i += 10 {
resp, err = etcdCli.Put(ctx, key, fmt.Sprintf("%d", i))
assert.Nil(t, err)
revList = append(revList, resp.Header.Revision)
tsList = append(tsList, typeutil.Timestamp(i))
}
lastRev := revList[len(revList)-1] + 1
for i, ts := range tsList {
rev := ms.getRevOnEtcd(ts, lastRev)
assert.Equal(t, revList[i], rev)
}
for i := 0; i < len(tsList); i++ {
rev := ms.getRevOnEtcd(tsList[i]+5, lastRev)
assert.Equal(t, revList[i], rev)
}
rev := ms.getRevOnEtcd(200, lastRev)
assert.Equal(t, lastRev-1, rev)
rev = ms.getRevOnEtcd(99, lastRev)
assert.Zero(t, rev)
}
func TestLoad(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
etcdAddr := Params.EtcdAddress
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
defer etcdCli.Close()
var vtso typeutil.Timestamp
ftso := func() typeutil.Timestamp {
return vtso
}
ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7, ftso)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
vtso = typeutil.Timestamp(100 + i*5)
ts, err := ms.Save("key", fmt.Sprintf("value-%d", i))
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
for i := 0; i < 20; i++ {
val, err := ms.Load("key", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, val, fmt.Sprintf("value-%d", i))
}
val, err := ms.Load("key", 0)
assert.Nil(t, err)
assert.Equal(t, "value-19", val)
ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11, ftso)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
val, err := ms.Load("key", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, val, fmt.Sprintf("value-%d", i))
}
}
func TestMultiSave(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
etcdAddr := Params.EtcdAddress
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
defer etcdCli.Close()
var vtso typeutil.Timestamp
ftso := func() typeutil.Timestamp {
return vtso
}
ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7, ftso)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
saves := map[string]string{"k1": fmt.Sprintf("v1-%d", i), "k2": fmt.Sprintf("v2-%d", i)}
vtso = typeutil.Timestamp(100 + i*5)
ts, err := ms.MultiSave(saves)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
for i := 0; i < 20; i++ {
keys, vals, err := ms.LoadWithPrefix("k", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, len(keys), len(vals))
assert.Equal(t, len(keys), 2)
assert.Equal(t, keys[0], "k1")
assert.Equal(t, keys[1], "k2")
assert.Equal(t, vals[0], fmt.Sprintf("v1-%d", i))
assert.Equal(t, vals[1], fmt.Sprintf("v2-%d", i))
}
keys, vals, err := ms.LoadWithPrefix("k", 0)
assert.Nil(t, err)
assert.Equal(t, len(keys), len(vals))
assert.Equal(t, len(keys), 2)
assert.Equal(t, keys[0], "k1")
assert.Equal(t, keys[1], "k2")
assert.Equal(t, vals[0], "v1-19")
assert.Equal(t, vals[1], "v2-19")
ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11, ftso)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
keys, vals, err := ms.LoadWithPrefix("k", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, len(keys), len(vals))
assert.Equal(t, len(keys), 2)
assert.Equal(t, keys[0], "k1")
assert.Equal(t, keys[1], "k2")
assert.Equal(t, vals[0], fmt.Sprintf("v1-%d", i))
assert.Equal(t, vals[1], fmt.Sprintf("v2-%d", i))
}
}
func TestMultiSaveAndRemoveWithPrefix(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
etcdAddr := Params.EtcdAddress
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
var vtso typeutil.Timestamp
ftso := func() typeutil.Timestamp {
return vtso
}
defer etcdCli.Close()
ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7, ftso)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
vtso = typeutil.Timestamp(100 + i*5)
ts, err := ms.Save(fmt.Sprintf("kd-%d", i), fmt.Sprintf("value-%d", i))
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
for i := 20; i < 40; i++ {
sm := map[string]string{"ks": fmt.Sprintf("value-%d", i)}
dm := []string{fmt.Sprintf("kd-%d", i-20)}
vtso = typeutil.Timestamp(100 + i*5)
ts, err := ms.MultiSaveAndRemoveWithPrefix(sm, dm)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
for i := 0; i < 20; i++ {
val, err := ms.Load(fmt.Sprintf("kd-%d", i), typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("value-%d", i), val)
_, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, i+1, len(vals))
}
for i := 20; i < 40; i++ {
val, err := ms.Load("ks", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("value-%d", i), val)
_, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, 39-i, len(vals))
}
ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11, ftso)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
val, err := ms.Load(fmt.Sprintf("kd-%d", i), typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("value-%d", i), val)
_, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, i+1, len(vals))
}
for i := 20; i < 40; i++ {
val, err := ms.Load("ks", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("value-%d", i), val)
_, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, 39-i, len(vals))
}
}

View File

@ -39,6 +39,8 @@ const (
SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index"
IndexMetaPrefix = ComponentPrefix + "/index"
TimestampPrefix = ComponentPrefix + "/timestamp"
DDOperationPrefix = ComponentPrefix + "/dd-operation"
DDMsgSendPrefix = ComponentPrefix + "/dd-msg-send"
@ -49,7 +51,7 @@ const (
)
type metaTable struct {
client kv.TxnKV // client of a reliable kv service, i.e. etcd client
client kv.SnapShotKV // client of a reliable kv service, i.e. etcd client
tenantID2Meta map[typeutil.UniqueID]pb.TenantMeta // tenant id to tenant meta
proxyID2Meta map[typeutil.UniqueID]pb.ProxyMeta // proxy id to proxy meta
collID2Meta map[typeutil.UniqueID]pb.CollectionInfo // collection_id -> meta
@ -68,7 +70,7 @@ type metaTable struct {
ddLock sync.RWMutex
}
func NewMetaTable(kv kv.TxnKV) (*metaTable, error) {
func NewMetaTable(kv kv.SnapShotKV) (*metaTable, error) {
mt := &metaTable{
client: kv,
tenantLock: sync.RWMutex{},
@ -97,7 +99,7 @@ func (mt *metaTable) reloadFromKV() error {
mt.flushedSegID = make(map[typeutil.UniqueID]bool)
mt.vChan2Chan = make(map[string]string)
_, values, err := mt.client.LoadWithPrefix(TenantMetaPrefix)
_, values, err := mt.client.LoadWithPrefix(TenantMetaPrefix, 0)
if err != nil {
return err
}
@ -111,7 +113,7 @@ func (mt *metaTable) reloadFromKV() error {
mt.tenantID2Meta[tenantMeta.ID] = tenantMeta
}
_, values, err = mt.client.LoadWithPrefix(ProxyMetaPrefix)
_, values, err = mt.client.LoadWithPrefix(ProxyMetaPrefix, 0)
if err != nil {
return err
}
@ -125,7 +127,7 @@ func (mt *metaTable) reloadFromKV() error {
mt.proxyID2Meta[proxyMeta.ID] = proxyMeta
}
_, values, err = mt.client.LoadWithPrefix(CollectionMetaPrefix)
_, values, err = mt.client.LoadWithPrefix(CollectionMetaPrefix, 0)
if err != nil {
return err
}
@ -147,7 +149,7 @@ func (mt *metaTable) reloadFromKV() error {
}
}
_, values, err = mt.client.LoadWithPrefix(PartitionMetaPrefix)
_, values, err = mt.client.LoadWithPrefix(PartitionMetaPrefix, 0)
if err != nil {
return err
}
@ -170,7 +172,7 @@ func (mt *metaTable) reloadFromKV() error {
}
}
_, values, err = mt.client.LoadWithPrefix(SegmentIndexMetaPrefix)
_, values, err = mt.client.LoadWithPrefix(SegmentIndexMetaPrefix, 0)
if err != nil {
return err
}
@ -190,7 +192,7 @@ func (mt *metaTable) reloadFromKV() error {
}
}
_, values, err = mt.client.LoadWithPrefix(IndexMetaPrefix)
_, values, err = mt.client.LoadWithPrefix(IndexMetaPrefix, 0)
if err != nil {
return err
}
@ -206,49 +208,51 @@ func (mt *metaTable) reloadFromKV() error {
return nil
}
func (mt *metaTable) AddTenant(te *pb.TenantMeta) error {
func (mt *metaTable) AddTenant(te *pb.TenantMeta) (typeutil.Timestamp, error) {
mt.tenantLock.Lock()
defer mt.tenantLock.Unlock()
k := fmt.Sprintf("%s/%d", TenantMetaPrefix, te.ID)
v := proto.MarshalTextString(te)
if err := mt.client.Save(k, v); err != nil {
return err
ts, err := mt.client.Save(k, v)
if err != nil {
return 0, err
}
mt.tenantID2Meta[te.ID] = *te
return nil
return ts, nil
}
func (mt *metaTable) AddProxy(po *pb.ProxyMeta) error {
func (mt *metaTable) AddProxy(po *pb.ProxyMeta) (typeutil.Timestamp, error) {
mt.proxyLock.Lock()
defer mt.proxyLock.Unlock()
k := fmt.Sprintf("%s/%d", ProxyMetaPrefix, po.ID)
v := proto.MarshalTextString(po)
if err := mt.client.Save(k, v); err != nil {
return err
ts, err := mt.client.Save(k, v)
if err != nil {
return 0, err
}
mt.proxyID2Meta[po.ID] = *po
return nil
return ts, nil
}
func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo, ddOpStr string) error {
func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo, ddOpStr string) (typeutil.Timestamp, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
if len(part.SegmentIDs) != 0 {
return errors.New("segment should be empty when creating collection")
return 0, errors.New("segment should be empty when creating collection")
}
if len(coll.PartitionIDs) != 0 {
return errors.New("partitions should be empty when creating collection")
return 0, errors.New("partitions should be empty when creating collection")
}
if _, ok := mt.collName2ID[coll.Schema.Name]; ok {
return fmt.Errorf("collection %s exist", coll.Schema.Name)
return 0, fmt.Errorf("collection %s exist", coll.Schema.Name)
}
if len(coll.FieldIndexes) != len(idx) {
return fmt.Errorf("incorrect index id when creating collection")
return 0, fmt.Errorf("incorrect index id when creating collection")
}
coll.PartitionIDs = append(coll.PartitionIDs, part.PartitionID)
@ -281,22 +285,22 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionIn
meta[DDOperationPrefix] = ddOpStr
meta[DDMsgSendPrefix] = "false"
err := mt.client.MultiSave(meta)
ts, err := mt.client.MultiSave(meta)
if err != nil {
_ = mt.reloadFromKV()
return err
return 0, err
}
return nil
return ts, nil
}
func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr string) error {
func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr string) (typeutil.Timestamp, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
collMeta, ok := mt.collID2Meta[collID]
if !ok {
return fmt.Errorf("can't find collection. id = %d", collID)
return 0, fmt.Errorf("can't find collection. id = %d", collID)
}
delete(mt.collID2Meta, collID)
@ -347,49 +351,84 @@ func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr string)
DDMsgSendPrefix: "false",
}
err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys)
ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys)
if err != nil {
_ = mt.reloadFromKV()
return err
return 0, err
}
return nil
return ts, nil
}
func (mt *metaTable) HasCollection(collID typeutil.UniqueID) bool {
func (mt *metaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timestamp) bool {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
_, ok := mt.collID2Meta[collID]
return ok
if ts == 0 {
_, ok := mt.collID2Meta[collID]
return ok
}
key := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
_, err := mt.client.Load(key, ts)
return err == nil
}
func (mt *metaTable) GetCollectionByID(collectionID typeutil.UniqueID) (*pb.CollectionInfo, error) {
func (mt *metaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
col, ok := mt.collID2Meta[collectionID]
if !ok {
return nil, fmt.Errorf("can't find collection id : %d", collectionID)
if ts == 0 {
col, ok := mt.collID2Meta[collectionID]
if !ok {
return nil, fmt.Errorf("can't find collection id : %d", collectionID)
}
colCopy := proto.Clone(&col)
return colCopy.(*pb.CollectionInfo), nil
}
colCopy := proto.Clone(&col)
return colCopy.(*pb.CollectionInfo), nil
key := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collectionID)
val, err := mt.client.Load(key, ts)
if err != nil {
return nil, err
}
colMeta := pb.CollectionInfo{}
err = proto.UnmarshalText(val, &colMeta)
if err != nil {
return nil, err
}
return &colMeta, nil
}
func (mt *metaTable) GetCollectionByName(collectionName string) (*pb.CollectionInfo, error) {
func (mt *metaTable) GetCollectionByName(collectionName string, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
vid, ok := mt.collName2ID[collectionName]
if !ok {
return nil, fmt.Errorf("can't find collection: " + collectionName)
if ts == 0 {
vid, ok := mt.collName2ID[collectionName]
if !ok {
return nil, fmt.Errorf("can't find collection: " + collectionName)
}
col, ok := mt.collID2Meta[vid]
if !ok {
return nil, fmt.Errorf("can't find collection: " + collectionName)
}
colCopy := proto.Clone(&col)
return colCopy.(*pb.CollectionInfo), nil
}
col, ok := mt.collID2Meta[vid]
if !ok {
return nil, fmt.Errorf("can't find collection: " + collectionName)
_, vals, err := mt.client.LoadWithPrefix(CollectionMetaPrefix, ts)
if err != nil {
return nil, err
}
colCopy := proto.Clone(&col)
return colCopy.(*pb.CollectionInfo), nil
for _, val := range vals {
collMeta := pb.CollectionInfo{}
err = proto.UnmarshalText(val, &collMeta)
if err != nil {
log.Debug("unmarshal collection info failed", zap.Error(err))
continue
}
if collMeta.Schema.Name == collectionName {
return &collMeta, nil
}
}
return nil, fmt.Errorf("can't find collection: %s, at timestamp = %d", collectionName, ts)
}
func (mt *metaTable) GetCollectionBySegmentID(segID typeutil.UniqueID) (*pb.CollectionInfo, error) {
@ -408,28 +447,44 @@ func (mt *metaTable) GetCollectionBySegmentID(segID typeutil.UniqueID) (*pb.Coll
return colCopy.(*pb.CollectionInfo), nil
}
func (mt *metaTable) ListCollections() ([]string, error) {
func (mt *metaTable) ListCollections(ts typeutil.Timestamp) ([]string, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
colls := make([]string, 0, len(mt.collName2ID))
for name := range mt.collName2ID {
colls = append(colls, name)
if ts == 0 {
colls := make([]string, 0, len(mt.collName2ID))
for name := range mt.collName2ID {
colls = append(colls, name)
}
return colls, nil
}
_, vals, err := mt.client.LoadWithPrefix(CollectionMetaPrefix, ts)
if err != nil {
return nil, err
}
colls := make([]string, 0, len(vals))
for _, val := range vals {
collMeta := pb.CollectionInfo{}
err := proto.UnmarshalText(val, &collMeta)
if err != nil {
log.Debug("unmarshal collection info failed", zap.Error(err))
}
colls = append(colls, collMeta.Schema.Name)
}
return colls, nil
}
func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ddOpStr string) error {
func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ddOpStr string) (typeutil.Timestamp, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
coll, ok := mt.collID2Meta[collID]
if !ok {
return fmt.Errorf("can't find collection. id = %d", collID)
return 0, fmt.Errorf("can't find collection. id = %d", collID)
}
// number of partition tags (except _default) should be limited to 4096 by default
if int64(len(coll.PartitionIDs)) >= Params.MaxPartitionNum {
return fmt.Errorf("maximum partition's number should be limit to %d", Params.MaxPartitionNum)
return 0, fmt.Errorf("maximum partition's number should be limit to %d", Params.MaxPartitionNum)
}
for _, t := range coll.PartitionIDs {
part, ok := mt.partitionID2Meta[t]
@ -438,10 +493,10 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string
continue
}
if part.PartitionName == partitionName {
return fmt.Errorf("partition name = %s already exists", partitionName)
return 0, fmt.Errorf("partition name = %s already exists", partitionName)
}
if part.PartitionID == partitionID {
return fmt.Errorf("partition id = %d already exists", partitionID)
return 0, fmt.Errorf("partition id = %d already exists", partitionID)
}
}
partMeta := pb.PartitionInfo{
@ -464,52 +519,83 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string
meta[DDOperationPrefix] = ddOpStr
meta[DDMsgSendPrefix] = "false"
err := mt.client.MultiSave(meta)
ts, err := mt.client.MultiSave(meta)
if err != nil {
_ = mt.reloadFromKV()
return err
return 0, err
}
return nil
return ts, nil
}
func (mt *metaTable) getPartitionByName(collID typeutil.UniqueID, partitionName string) (pb.PartitionInfo, error) {
collMeta, ok := mt.collID2Meta[collID]
if !ok {
return pb.PartitionInfo{}, fmt.Errorf("can't find collection id = %d", collID)
func (mt *metaTable) getPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (pb.PartitionInfo, error) {
if ts == 0 {
collMeta, ok := mt.collID2Meta[collID]
if !ok {
return pb.PartitionInfo{}, fmt.Errorf("can't find collection id = %d", collID)
}
for _, id := range collMeta.PartitionIDs {
partMeta, ok := mt.partitionID2Meta[id]
if ok && partMeta.PartitionName == partitionName {
return partMeta, nil
}
}
return pb.PartitionInfo{}, fmt.Errorf("partition %s does not exist", partitionName)
}
collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
collVal, err := mt.client.Load(collKey, ts)
if err != nil {
return pb.PartitionInfo{}, err
}
collMeta := pb.CollectionMeta{}
err = proto.UnmarshalText(collVal, &collMeta)
if err != nil {
return pb.PartitionInfo{}, err
}
for _, id := range collMeta.PartitionIDs {
partMeta, ok := mt.partitionID2Meta[id]
if ok && partMeta.PartitionName == partitionName {
partKey := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, collID, id)
partVal, err := mt.client.Load(partKey, ts)
if err != nil {
log.Debug("load partition meta failed", zap.String("collection name", collMeta.Schema.Name), zap.Int64("partition id", id))
continue
}
partMeta := pb.PartitionInfo{}
err = proto.UnmarshalText(partVal, &partMeta)
if err != nil {
log.Debug("unmarshal partition meta failed", zap.Error(err))
continue
}
if partMeta.PartitionName == partitionName {
return partMeta, nil
}
}
return pb.PartitionInfo{}, fmt.Errorf("partition %s does not exist", partitionName)
}
func (mt *metaTable) GetPartitionByName(collID typeutil.UniqueID, partitionName string) (pb.PartitionInfo, error) {
func (mt *metaTable) GetPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (pb.PartitionInfo, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
return mt.getPartitionByName(collID, partitionName)
return mt.getPartitionByName(collID, partitionName, ts)
}
func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string) bool {
func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) bool {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
_, err := mt.getPartitionByName(collID, partitionName)
_, err := mt.getPartitionByName(collID, partitionName, ts)
return err == nil
}
func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ddOpStr string) (typeutil.UniqueID, error) {
//return timestamp, partitionid, error
func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ddOpStr string) (typeutil.Timestamp, typeutil.UniqueID, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
if partitionName == Params.DefaultPartitionName {
return 0, fmt.Errorf("default partition cannot be deleted")
return 0, 0, fmt.Errorf("default partition cannot be deleted")
}
collMeta, ok := mt.collID2Meta[collID]
if !ok {
return 0, fmt.Errorf("can't find collection id = %d", collID)
return 0, 0, fmt.Errorf("can't find collection id = %d", collID)
}
// check tag exists
@ -529,7 +615,7 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
}
}
if !exist {
return 0, fmt.Errorf("partition %s does not exist", partitionName)
return 0, 0, fmt.Errorf("partition %s does not exist", partitionName)
}
delete(mt.partitionID2Meta, partMeta.PartitionID)
collMeta.PartitionIDs = pd
@ -560,34 +646,48 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
meta[DDOperationPrefix] = ddOpStr
meta[DDMsgSendPrefix] = "false"
err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys)
ts, err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys)
if err != nil {
_ = mt.reloadFromKV()
return 0, err
return 0, 0, err
}
return partMeta.PartitionID, nil
return ts, partMeta.PartitionID, nil
}
func (mt *metaTable) GetPartitionByID(partitionID typeutil.UniqueID) (pb.PartitionInfo, error) {
func (mt *metaTable) GetPartitionByID(collID typeutil.UniqueID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) (pb.PartitionInfo, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
partMeta, ok := mt.partitionID2Meta[partitionID]
if !ok {
return pb.PartitionInfo{}, fmt.Errorf("partition id = %d not exist", partitionID)
if ts == 0 {
partMeta, ok := mt.partitionID2Meta[partitionID]
if !ok {
return pb.PartitionInfo{}, fmt.Errorf("partition id = %d not exist", partitionID)
}
return partMeta, nil
}
return partMeta, nil
partKey := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, collID, partitionID)
partVal, err := mt.client.Load(partKey, ts)
if err != nil {
return pb.PartitionInfo{}, err
}
partInfo := pb.PartitionInfo{}
err = proto.UnmarshalText(partVal, &partInfo)
if err != nil {
return pb.PartitionInfo{}, err
}
return partInfo, nil
}
func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) error {
func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) (typeutil.Timestamp, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
collMeta, ok := mt.collID2Meta[seg.CollectionID]
if !ok {
return fmt.Errorf("can't find collection id = %d", seg.CollectionID)
return 0, fmt.Errorf("can't find collection id = %d", seg.CollectionID)
}
partMeta, ok := mt.partitionID2Meta[seg.PartitionID]
if !ok {
return fmt.Errorf("can't find partition id = %d", seg.PartitionID)
return 0, fmt.Errorf("can't find partition id = %d", seg.PartitionID)
}
exist := false
for _, partID := range collMeta.PartitionIDs {
@ -597,7 +697,7 @@ func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) error {
}
}
if !exist {
return fmt.Errorf("partition id = %d, not belong to collection id = %d", seg.PartitionID, seg.CollectionID)
return 0, fmt.Errorf("partition id = %d, not belong to collection id = %d", seg.PartitionID, seg.CollectionID)
}
exist = false
for _, segID := range partMeta.SegmentIDs {
@ -606,7 +706,7 @@ func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) error {
}
}
if exist {
return fmt.Errorf("segment id = %d exist", seg.ID)
return 0, fmt.Errorf("segment id = %d exist", seg.ID)
}
partMeta.SegmentIDs = append(partMeta.SegmentIDs, seg.ID)
mt.partitionID2Meta[seg.PartitionID] = partMeta
@ -615,30 +715,30 @@ func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) error {
k := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, seg.CollectionID, seg.PartitionID)
v := proto.MarshalTextString(&partMeta)
err := mt.client.Save(k, v)
ts, err := mt.client.Save(k, v)
if err != nil {
_ = mt.reloadFromKV()
return err
return 0, err
}
return nil
return ts, nil
}
func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) error {
func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) (typeutil.Timestamp, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
collID, ok := mt.segID2CollID[segIdxInfo.SegmentID]
if !ok {
return fmt.Errorf("segment id = %d not belong to any collection", segIdxInfo.SegmentID)
return 0, fmt.Errorf("segment id = %d not belong to any collection", segIdxInfo.SegmentID)
}
collMeta, ok := mt.collID2Meta[collID]
if !ok {
return fmt.Errorf("collection id = %d not found", collID)
return 0, fmt.Errorf("collection id = %d not found", collID)
}
partID, ok := mt.segID2PartitionID[segIdxInfo.SegmentID]
if !ok {
return fmt.Errorf("segment id = %d not belong to any partition", segIdxInfo.SegmentID)
return 0, fmt.Errorf("segment id = %d not belong to any partition", segIdxInfo.SegmentID)
}
exist := false
for _, fidx := range collMeta.FieldIndexes {
@ -648,7 +748,7 @@ func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) error {
}
}
if !exist {
return fmt.Errorf("index id = %d not found", segIdxInfo.IndexID)
return 0, fmt.Errorf("index id = %d not found", segIdxInfo.IndexID)
}
segIdxMap, ok := mt.segID2IndexMeta[segIdxInfo.SegmentID]
@ -660,9 +760,9 @@ func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) error {
if ok {
if SegmentIndexInfoEqual(segIdxInfo, &tmpInfo) {
log.Debug("Identical SegmentIndexInfo already exist", zap.Int64("IndexID", segIdxInfo.IndexID))
return nil
return 0, nil
}
return fmt.Errorf("index id = %d exist", segIdxInfo.IndexID)
return 0, fmt.Errorf("index id = %d exist", segIdxInfo.IndexID)
}
}
@ -670,34 +770,35 @@ func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) error {
k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, collID, segIdxInfo.IndexID, partID, segIdxInfo.SegmentID)
v := proto.MarshalTextString(segIdxInfo)
err := mt.client.Save(k, v)
ts, err := mt.client.Save(k, v)
if err != nil {
_ = mt.reloadFromKV()
return err
return 0, err
}
if _, ok := mt.flushedSegID[segIdxInfo.SegmentID]; !ok {
mt.flushedSegID[segIdxInfo.SegmentID] = true
}
return nil
return ts, nil
}
func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.UniqueID, bool, error) {
//return timestamp, index id, is dropped, error
func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.Timestamp, typeutil.UniqueID, bool, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
collID, ok := mt.collName2ID[collName]
if !ok {
return 0, false, fmt.Errorf("collection name = %s not exist", collName)
return 0, 0, false, fmt.Errorf("collection name = %s not exist", collName)
}
collMeta, ok := mt.collID2Meta[collID]
if !ok {
return 0, false, fmt.Errorf("collection name = %s not has meta", collName)
return 0, 0, false, fmt.Errorf("collection name = %s not has meta", collName)
}
fieldSch, err := mt.unlockGetFieldSchema(collName, fieldName)
if err != nil {
return 0, false, err
return 0, 0, false, err
}
fieldIdxInfo := make([]*pb.FieldIndexInfo, 0, len(collMeta.FieldIndexes))
var dropIdxID typeutil.UniqueID
@ -722,7 +823,7 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.
}
if len(fieldIdxInfo) == len(collMeta.FieldIndexes) {
log.Warn("drop index,index not found", zap.String("collection name", collName), zap.String("filed name", fieldName), zap.String("index name", indexName))
return 0, false, nil
return 0, 0, false, nil
}
collMeta.FieldIndexes = fieldIdxInfo
mt.collID2Meta[collID] = collMeta
@ -751,13 +852,13 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.
fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collMeta.ID, dropIdxID),
}
err = mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta)
ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta)
if err != nil {
_ = mt.reloadFromKV()
return 0, false, err
return 0, 0, false, err
}
return dropIdxID, true, nil
return ts, dropIdxID, true, nil
}
func (mt *metaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error) {
@ -930,7 +1031,7 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id
meta[k] = v
}
err = mt.client.MultiSave(meta)
_, err = mt.client.MultiSave(meta)
if err != nil {
_ = mt.reloadFromKV()
return nil, schemapb.FieldSchema{}, err
@ -953,7 +1054,7 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id
meta[k] = v
}
err = mt.client.MultiSave(meta)
_, err = mt.client.MultiSave(meta)
if err != nil {
_ = mt.reloadFromKV()
return nil, schemapb.FieldSchema{}, err

View File

@ -19,7 +19,6 @@ import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
@ -32,36 +31,35 @@ import (
type mockTestKV struct {
kv.TxnKV
loadWithPrefix func(key string) ([]string, []string, error)
save func(key, value string) error
multiSave func(kvs map[string]string) error
multiRemoveWithPrefix func(keys []string) error
multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string) error
loadWithPrefix func(key string, ts typeutil.Timestamp) ([]string, []string, error)
save func(key, value string) (typeutil.Timestamp, error)
multiSave func(kvs map[string]string) (typeutil.Timestamp, error)
multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string) (typeutil.Timestamp, error)
}
func (m *mockTestKV) LoadWithPrefix(key string) ([]string, []string, error) {
return m.loadWithPrefix(key)
func (m *mockTestKV) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return m.loadWithPrefix(key, ts)
}
func (m *mockTestKV) Load(key string, ts typeutil.Timestamp) (string, error) {
return "", nil
}
func (m *mockTestKV) Save(key, value string) error {
func (m *mockTestKV) Save(key, value string) (typeutil.Timestamp, error) {
return m.save(key, value)
}
func (m *mockTestKV) MultiSave(kvs map[string]string) error {
func (m *mockTestKV) MultiSave(kvs map[string]string) (typeutil.Timestamp, error) {
return m.multiSave(kvs)
}
func (m *mockTestKV) MultiRemoveWithPrefix(keys []string) error {
return m.multiRemoveWithPrefix(keys)
}
func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) (typeutil.Timestamp, error) {
return m.multiSaveAndRemoveWithPrefix(saves, removals)
}
func Test_MockKV(t *testing.T) {
k1 := &mockTestKV{}
prefix := make(map[string][]string)
k1.loadWithPrefix = func(key string) ([]string, []string, error) {
k1.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
if val, ok := prefix[key]; ok {
return nil, val, nil
}
@ -131,17 +129,17 @@ func Test_MockKV(t *testing.T) {
m1, err := NewMetaTable(k1)
assert.Nil(t, err)
k1.save = func(key, value string) error {
return fmt.Errorf("save tenant error")
k1.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("save tenant error")
}
err = m1.AddTenant(&pb.TenantMeta{})
_, err = m1.AddTenant(&pb.TenantMeta{})
assert.NotNil(t, err)
assert.EqualError(t, err, "save tenant error")
k1.save = func(key, value string) error {
return fmt.Errorf("save proxy error")
k1.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("save proxy error")
}
err = m1.AddProxy(&pb.ProxyMeta{})
_, err = m1.AddProxy(&pb.ProxyMeta{})
assert.NotNil(t, err)
assert.EqualError(t, err, "save proxy error")
}
@ -168,11 +166,18 @@ func TestMetaTable(t *testing.T) {
etcdAddr := Params.EtcdAddress
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
var vtso typeutil.Timestamp
ftso := func() typeutil.Timestamp {
vtso++
return vtso
}
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
ekv := etcdkv.NewEtcdKV(etcdCli, rootPath)
assert.NotNil(t, ekv)
mt, err := NewMetaTable(ekv)
skv, err := newMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7, ftso)
assert.Nil(t, err)
assert.NotNil(t, skv)
mt, err := NewMetaTable(skv)
assert.Nil(t, err)
collInfo := &pb.CollectionInfo{
@ -249,42 +254,43 @@ func TestMetaTable(t *testing.T) {
t.Run("add collection", func(t *testing.T) {
partInfoDefault.SegmentIDs = []int64{segID}
err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "")
assert.NotNil(t, err)
partInfoDefault.SegmentIDs = []int64{}
collInfo.PartitionIDs = []int64{segID}
err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "")
assert.NotNil(t, err)
collInfo.PartitionIDs = []int64{}
err = mt.AddCollection(collInfo, partInfoDefault, nil, "")
_, err = mt.AddCollection(collInfo, partInfoDefault, nil, "")
assert.NotNil(t, err)
err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "")
assert.Nil(t, err)
collMeta, err := mt.GetCollectionByName("testColl")
collMeta, err := mt.GetCollectionByName("testColl", 0)
assert.Nil(t, err)
assert.Equal(t, partIDDefault, collMeta.PartitionIDs[0])
assert.Equal(t, 1, len(collMeta.PartitionIDs))
assert.True(t, mt.HasCollection(collInfo.ID))
assert.True(t, mt.HasCollection(collInfo.ID, 0))
field, err := mt.GetFieldSchema("testColl", "field110")
assert.Nil(t, err)
assert.Equal(t, collInfo.Schema.Fields[0].FieldID, field.FieldID)
// check DD operation flag
flag, err := mt.client.Load(DDMsgSendPrefix)
flag, err := mt.client.Load(DDMsgSendPrefix, 0)
assert.Nil(t, err)
assert.Equal(t, "false", flag)
})
t.Run("add partition", func(t *testing.T) {
assert.Nil(t, mt.AddPartition(collID, partInfo.PartitionName, partInfo.PartitionID, ""))
_, err := mt.AddPartition(collID, partInfo.PartitionName, partInfo.PartitionID, "")
assert.Nil(t, err)
// check DD operation flag
flag, err := mt.client.Load(DDMsgSendPrefix)
flag, err := mt.client.Load(DDMsgSendPrefix, 0)
assert.Nil(t, err)
assert.Equal(t, "false", flag)
})
@ -295,16 +301,25 @@ func TestMetaTable(t *testing.T) {
CollectionID: collID,
PartitionID: partID,
}
assert.Nil(t, mt.AddSegment(seg))
assert.NotNil(t, mt.AddSegment(seg))
_, err := mt.AddSegment(seg)
assert.Nil(t, err)
_, err = mt.AddSegment(seg)
assert.NotNil(t, err)
seg.ID = segID2
seg.CollectionID = collIDInvalid
assert.NotNil(t, mt.AddSegment(seg))
_, err = mt.AddSegment(seg)
assert.NotNil(t, err)
seg.CollectionID = collID
seg.PartitionID = partIDInvalid
assert.NotNil(t, mt.AddSegment(seg))
_, err = mt.AddSegment(seg)
assert.NotNil(t, err)
seg.PartitionID = partID
assert.Nil(t, mt.AddSegment(seg))
_, err = mt.AddSegment(seg)
assert.Nil(t, err)
})
t.Run("add segment index", func(t *testing.T) {
@ -314,15 +329,15 @@ func TestMetaTable(t *testing.T) {
IndexID: indexID,
BuildID: buildID,
}
err := mt.AddIndex(&segIdxInfo)
_, err := mt.AddIndex(&segIdxInfo)
assert.Nil(t, err)
// it's legal to add index twice
err = mt.AddIndex(&segIdxInfo)
_, err = mt.AddIndex(&segIdxInfo)
assert.Nil(t, err)
segIdxInfo.BuildID = 202
err = mt.AddIndex(&segIdxInfo)
_, err = mt.AddIndex(&segIdxInfo)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("index id = %d exist", segIdxInfo.IndexID))
})
@ -408,25 +423,25 @@ func TestMetaTable(t *testing.T) {
te := pb.TenantMeta{
ID: 100,
}
err := mt.AddTenant(&te)
_, err := mt.AddTenant(&te)
assert.Nil(t, err)
po := pb.ProxyMeta{
ID: 101,
}
err = mt.AddProxy(&po)
_, err = mt.AddProxy(&po)
assert.Nil(t, err)
_, err = NewMetaTable(ekv)
_, err = NewMetaTable(skv)
assert.Nil(t, err)
})
t.Run("drop index", func(t *testing.T) {
idx, ok, err := mt.DropIndex("testColl", "field110", "field110")
_, idx, ok, err := mt.DropIndex("testColl", "field110", "field110")
assert.Nil(t, err)
assert.True(t, ok)
assert.Equal(t, indexID, idx)
_, ok, err = mt.DropIndex("testColl", "field110", "field110-error")
_, _, ok, err = mt.DropIndex("testColl", "field110", "field110-error")
assert.Nil(t, err)
assert.False(t, ok)
@ -444,24 +459,24 @@ func TestMetaTable(t *testing.T) {
})
t.Run("drop partition", func(t *testing.T) {
id, err := mt.DeletePartition(collID, partInfo.PartitionName, "")
_, id, err := mt.DeletePartition(collID, partInfo.PartitionName, "")
assert.Nil(t, err)
assert.Equal(t, partID, id)
// check DD operation flag
flag, err := mt.client.Load(DDMsgSendPrefix)
flag, err := mt.client.Load(DDMsgSendPrefix, 0)
assert.Nil(t, err)
assert.Equal(t, "false", flag)
})
t.Run("drop collection", func(t *testing.T) {
err = mt.DeleteCollection(collIDInvalid, "")
_, err = mt.DeleteCollection(collIDInvalid, "")
assert.NotNil(t, err)
err = mt.DeleteCollection(collID, "")
_, err = mt.DeleteCollection(collID, "")
assert.Nil(t, err)
// check DD operation flag
flag, err := mt.client.Load(DDMsgSendPrefix)
flag, err := mt.client.Load(DDMsgSendPrefix, 0)
assert.Nil(t, err)
assert.Equal(t, "false", flag)
})
@ -471,42 +486,42 @@ func TestMetaTable(t *testing.T) {
mt.client = mockKV
t.Run("add collection failed", func(t *testing.T) {
mockKV.loadWithPrefix = func(key string) ([]string, []string, error) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string) error {
return fmt.Errorf("multi save error")
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save error")
}
collInfo.PartitionIDs = nil
err := mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err := mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.NotNil(t, err)
assert.EqualError(t, err, "multi save error")
})
t.Run("delete collection failed", func(t *testing.T) {
mockKV.multiSave = func(kvs map[string]string) error {
return nil
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
return 0, nil
}
mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string) error {
return fmt.Errorf("milti save and remove with prefix error")
mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("milti save and remove with prefix error")
}
collInfo.PartitionIDs = nil
err := mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err := mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
mt.partitionID2Meta = make(map[typeutil.UniqueID]pb.PartitionInfo)
mt.indexID2Meta = make(map[int64]pb.IndexInfo)
err = mt.DeleteCollection(collInfo.ID, "")
_, err = mt.DeleteCollection(collInfo.ID, "")
assert.NotNil(t, err)
assert.EqualError(t, err, "milti save and remove with prefix error")
})
t.Run("get collection failed", func(t *testing.T) {
mockKV.save = func(key, value string) error {
return nil
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, nil
}
collInfo.PartitionIDs = nil
err := mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err := mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
seg := &datapb.SegmentInfo{
@ -514,10 +529,11 @@ func TestMetaTable(t *testing.T) {
CollectionID: collID,
PartitionID: partID,
}
assert.Nil(t, mt.AddSegment(seg))
_, err = mt.AddSegment(seg)
assert.Nil(t, err)
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
_, err = mt.GetCollectionByName(collInfo.Schema.Name)
_, err = mt.GetCollectionByName(collInfo.Schema.Name, 0)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("can't find collection: %s", collInfo.Schema.Name))
@ -532,130 +548,130 @@ func TestMetaTable(t *testing.T) {
})
t.Run("add partition failed", func(t *testing.T) {
mockKV.save = func(key, value string) error {
return nil
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, nil
}
mockKV.loadWithPrefix = func(key string) ([]string, []string, error) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
err := mt.reloadFromKV()
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
err = mt.AddPartition(2, "no-part", 22, "")
_, err = mt.AddPartition(2, "no-part", 22, "")
assert.NotNil(t, err)
assert.EqualError(t, err, "can't find collection. id = 2")
coll := mt.collID2Meta[collInfo.ID]
coll.PartitionIDs = make([]int64, Params.MaxPartitionNum)
mt.collID2Meta[coll.ID] = coll
err = mt.AddPartition(coll.ID, "no-part", 22, "")
_, err = mt.AddPartition(coll.ID, "no-part", 22, "")
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("maximum partition's number should be limit to %d", Params.MaxPartitionNum))
coll.PartitionIDs = []int64{partInfo.PartitionID}
mt.collID2Meta[coll.ID] = coll
mt.partitionID2Meta = make(map[int64]pb.PartitionInfo)
mockKV.multiSave = func(kvs map[string]string) error {
return fmt.Errorf("multi save error")
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save error")
}
err = mt.AddPartition(coll.ID, "no-part", 22, "")
_, err = mt.AddPartition(coll.ID, "no-part", 22, "")
assert.NotNil(t, err)
assert.EqualError(t, err, "multi save error")
mockKV.multiSave = func(kvs map[string]string) error {
return nil
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
return 0, nil
}
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
err = mt.AddPartition(coll.ID, partInfo.PartitionName, 22, "")
_, err = mt.AddPartition(coll.ID, partInfo.PartitionName, 22, "")
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("partition name = %s already exists", partInfo.PartitionName))
err = mt.AddPartition(coll.ID, "no-part", partInfo.PartitionID, "")
_, err = mt.AddPartition(coll.ID, "no-part", partInfo.PartitionID, "")
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("partition id = %d already exists", partInfo.PartitionID))
})
t.Run("has partition failed", func(t *testing.T) {
mockKV.loadWithPrefix = func(key string) ([]string, []string, error) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string) error {
return nil
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
return 0, nil
}
err := mt.reloadFromKV()
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
mt.partitionID2Meta = make(map[int64]pb.PartitionInfo)
assert.False(t, mt.HasPartition(collInfo.ID, partInfo.PartitionName))
assert.False(t, mt.HasPartition(collInfo.ID, partInfo.PartitionName, 0))
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
assert.False(t, mt.HasPartition(collInfo.ID, partInfo.PartitionName))
assert.False(t, mt.HasPartition(collInfo.ID, partInfo.PartitionName, 0))
})
t.Run("delete partition failed", func(t *testing.T) {
mockKV.loadWithPrefix = func(key string) ([]string, []string, error) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string) error {
return nil
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
return 0, nil
}
err := mt.reloadFromKV()
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
_, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName, "")
_, _, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName, "")
assert.NotNil(t, err)
assert.EqualError(t, err, "default partition cannot be deleted")
_, err = mt.DeletePartition(collInfo.ID, "abc", "")
_, _, err = mt.DeletePartition(collInfo.ID, "abc", "")
assert.NotNil(t, err)
assert.EqualError(t, err, "partition abc does not exist")
pm := mt.partitionID2Meta[partInfo.PartitionID]
pm.SegmentIDs = []int64{11, 12, 13}
mt.partitionID2Meta[pm.PartitionID] = pm
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string) error {
return fmt.Errorf("multi save and remove with prefix error")
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save and remove with prefix error")
}
_, err = mt.DeletePartition(collInfo.ID, pm.PartitionName, "")
_, _, err = mt.DeletePartition(collInfo.ID, pm.PartitionName, "")
assert.NotNil(t, err)
assert.EqualError(t, err, "multi save and remove with prefix error")
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
_, err = mt.DeletePartition(collInfo.ID, "abc", "")
_, _, err = mt.DeletePartition(collInfo.ID, "abc", "")
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("can't find collection id = %d", collInfo.ID))
_, err = mt.GetPartitionByID(11)
_, err = mt.GetPartitionByID(1, 11, 0)
assert.NotNil(t, err)
assert.EqualError(t, err, "partition id = 11 not exist")
})
t.Run("add segment failed", func(t *testing.T) {
mockKV.loadWithPrefix = func(key string) ([]string, []string, error) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string) error {
return nil
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
return 0, nil
}
err := mt.reloadFromKV()
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
noPart := pb.PartitionInfo{
@ -670,7 +686,7 @@ func TestMetaTable(t *testing.T) {
CollectionID: collInfo.ID,
PartitionID: noPart.PartitionID,
}
err = mt.AddSegment(seg)
_, err = mt.AddSegment(seg)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("partition id = %d, not belong to collection id = %d", seg.PartitionID, seg.CollectionID))
@ -679,29 +695,29 @@ func TestMetaTable(t *testing.T) {
CollectionID: collInfo.ID,
PartitionID: partInfo.PartitionID,
}
mockKV.save = func(key, value string) error {
return fmt.Errorf("save error")
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("save error")
}
err = mt.AddSegment(seg)
_, err = mt.AddSegment(seg)
assert.NotNil(t, err)
assert.EqualError(t, err, "save error")
})
t.Run("add index failed", func(t *testing.T) {
mockKV.loadWithPrefix = func(key string) ([]string, []string, error) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string) error {
return nil
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
return 0, nil
}
mockKV.save = func(key, value string) error {
return nil
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, nil
}
err := mt.reloadFromKV()
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
seg := &datapb.SegmentInfo{
@ -709,7 +725,8 @@ func TestMetaTable(t *testing.T) {
CollectionID: collID,
PartitionID: partID,
}
assert.Nil(t, mt.AddSegment(seg))
_, err = mt.AddSegment(seg)
assert.Nil(t, err)
segIdxInfo := &pb.SegmentIndexInfo{
SegmentID: segID,
@ -717,22 +734,22 @@ func TestMetaTable(t *testing.T) {
IndexID: indexID2,
BuildID: buildID,
}
err = mt.AddIndex(segIdxInfo)
_, err = mt.AddIndex(segIdxInfo)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", segIdxInfo.IndexID))
mt.segID2PartitionID = make(map[int64]int64)
err = mt.AddIndex(segIdxInfo)
_, err = mt.AddIndex(segIdxInfo)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("segment id = %d not belong to any partition", segIdxInfo.SegmentID))
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
err = mt.AddIndex(segIdxInfo)
_, err = mt.AddIndex(segIdxInfo)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("collection id = %d not found", collInfo.ID))
mt.segID2CollID = make(map[int64]int64)
err = mt.AddIndex(segIdxInfo)
_, err = mt.AddIndex(segIdxInfo)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("segment id = %d not belong to any collection", segIdxInfo.SegmentID))
@ -740,46 +757,47 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
_, err = mt.AddSegment(seg)
assert.Nil(t, err)
assert.Nil(t, mt.AddSegment(seg))
segIdxInfo.IndexID = indexID
mockKV.save = func(key, value string) error {
return fmt.Errorf("save error")
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("save error")
}
err = mt.AddIndex(segIdxInfo)
_, err = mt.AddIndex(segIdxInfo)
assert.NotNil(t, err)
assert.EqualError(t, err, "save error")
})
t.Run("drop index failed", func(t *testing.T) {
mockKV.loadWithPrefix = func(key string) ([]string, []string, error) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string) error {
return nil
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
return 0, nil
}
mockKV.save = func(key, value string) error {
return nil
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, nil
}
err := mt.reloadFromKV()
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
_, _, err = mt.DropIndex("abc", "abc", "abc")
_, _, _, err = mt.DropIndex("abc", "abc", "abc")
assert.NotNil(t, err)
assert.EqualError(t, err, "collection name = abc not exist")
mt.collName2ID["abc"] = 2
_, _, err = mt.DropIndex("abc", "abc", "abc")
_, _, _, err = mt.DropIndex("abc", "abc", "abc")
assert.NotNil(t, err)
assert.EqualError(t, err, "collection name = abc not has meta")
_, _, err = mt.DropIndex(collInfo.Schema.Name, "abc", "abc")
_, _, _, err = mt.DropIndex(collInfo.Schema.Name, "abc", "abc")
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("collection %s doesn't have filed abc", collInfo.Schema.Name))
@ -796,7 +814,7 @@ func TestMetaTable(t *testing.T) {
}
mt.collID2Meta[coll.ID] = coll
mt.indexID2Meta = make(map[int64]pb.IndexInfo)
idxID, isDroped, err := mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName)
_, idxID, isDroped, err := mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName)
assert.Zero(t, idxID)
assert.False(t, isDroped)
assert.Nil(t, err)
@ -804,32 +822,32 @@ func TestMetaTable(t *testing.T) {
err = mt.reloadFromKV()
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
mt.partitionID2Meta = make(map[int64]pb.PartitionInfo)
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string) error {
return fmt.Errorf("multi save and remove with prefix error")
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save and remove with prefix error")
}
_, _, err = mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName)
_, _, _, err = mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName)
assert.NotNil(t, err)
assert.EqualError(t, err, "multi save and remove with prefix error")
})
t.Run("get segment index info by id", func(t *testing.T) {
mockKV.loadWithPrefix = func(key string) ([]string, []string, error) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string) error {
return nil
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
return 0, nil
}
mockKV.save = func(key, value string) error {
return nil
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, nil
}
err := mt.reloadFromKV()
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
_, err = mt.GetSegmentIndexInfoByID(segID2, fieldID, "abc")
@ -849,14 +867,16 @@ func TestMetaTable(t *testing.T) {
CollectionID: collID,
PartitionID: partID,
}
assert.Nil(t, mt.AddSegment(segInfo))
_, err = mt.AddSegment(segInfo)
assert.Nil(t, err)
segIdx := &pb.SegmentIndexInfo{
SegmentID: segID,
FieldID: fieldID,
IndexID: indexID,
BuildID: buildID,
}
assert.Nil(t, mt.AddIndex(segIdx))
_, err = mt.AddIndex(segIdx)
assert.Nil(t, err)
idx, err := mt.GetSegmentIndexInfoByID(segIdx.SegmentID, segIdx.FieldID, idxInfo[0].IndexName)
assert.Nil(t, err)
assert.Equal(t, segIdx.IndexID, idx.IndexID)
@ -871,20 +891,20 @@ func TestMetaTable(t *testing.T) {
})
t.Run("get field schema failed", func(t *testing.T) {
mockKV.loadWithPrefix = func(key string) ([]string, []string, error) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string) error {
return nil
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
return 0, nil
}
mockKV.save = func(key, value string) error {
return nil
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, nil
}
err := mt.reloadFromKV()
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
@ -899,7 +919,7 @@ func TestMetaTable(t *testing.T) {
})
t.Run("is segment indexed", func(t *testing.T) {
mockKV.loadWithPrefix = func(key string) ([]string, []string, error) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
err := mt.reloadFromKV()
@ -923,7 +943,7 @@ func TestMetaTable(t *testing.T) {
})
t.Run("get not indexed segments", func(t *testing.T) {
mockKV.loadWithPrefix = func(key string) ([]string, []string, error) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
err := mt.reloadFromKV()
@ -945,17 +965,17 @@ func TestMetaTable(t *testing.T) {
assert.NotNil(t, err)
assert.EqualError(t, err, "collection abc not found")
mockKV.multiSave = func(kvs map[string]string) error {
return nil
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
return 0, nil
}
mockKV.save = func(key, value string) error {
return nil
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, nil
}
err = mt.reloadFromKV()
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, "no-field", idx)
@ -969,18 +989,18 @@ func TestMetaTable(t *testing.T) {
assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", idxInfo[0].IndexID))
mt.indexID2Meta = bakMeta
mockKV.multiSave = func(kvs map[string]string) error {
return fmt.Errorf("multi save error")
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save error")
}
_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx)
assert.NotNil(t, err)
assert.EqualError(t, err, "multi save error")
mockKV.multiSave = func(kvs map[string]string) error {
return nil
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
return 0, nil
}
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
coll := mt.collID2Meta[collInfo.ID]
coll.FieldIndexes = append(coll.FieldIndexes, &pb.FieldIndexInfo{FiledID: coll.FieldIndexes[0].FiledID, IndexID: coll.FieldIndexes[0].IndexID + 1})
@ -998,8 +1018,8 @@ func TestMetaTable(t *testing.T) {
mt.indexID2Meta[anotherIdx.IndexID] = anotherIdx
idx.IndexName = idxInfo[0].IndexName
mockKV.multiSave = func(kvs map[string]string) error {
return fmt.Errorf("multi save error")
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save error")
}
_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx)
assert.NotNil(t, err)
@ -1007,7 +1027,7 @@ func TestMetaTable(t *testing.T) {
})
t.Run("get index by name failed", func(t *testing.T) {
mockKV.loadWithPrefix = func(key string) ([]string, []string, error) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
err := mt.reloadFromKV()
@ -1018,17 +1038,17 @@ func TestMetaTable(t *testing.T) {
assert.NotNil(t, err)
assert.EqualError(t, err, "collection abc not found")
mockKV.multiSave = func(kvs map[string]string) error {
return nil
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
return 0, nil
}
mockKV.save = func(key, value string) error {
return nil
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
return 0, nil
}
err = mt.reloadFromKV()
assert.Nil(t, err)
collInfo.PartitionIDs = nil
err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
assert.Nil(t, err)
mt.indexID2Meta = make(map[int64]pb.IndexInfo)
_, _, err = mt.GetIndexByName(collInfo.Schema.Name, idxInfo[0].IndexName)
@ -1041,7 +1061,7 @@ func TestMetaTable(t *testing.T) {
})
t.Run("add flused segment failed", func(t *testing.T) {
mockKV.loadWithPrefix = func(key string) ([]string, []string, error) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
err := mt.reloadFromKV()

View File

@ -222,7 +222,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
return err
}
err = t.core.MetaTable.AddCollection(&collInfo, &partInfo, idxInfo, ddOpStr)
_, err = t.core.MetaTable.AddCollection(&collInfo, &partInfo, idxInfo, ddOpStr)
if err != nil {
return err
}
@ -266,7 +266,7 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
return fmt.Errorf("drop collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0)
if err != nil {
return err
}
@ -286,7 +286,7 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
return err
}
err = t.core.MetaTable.DeleteCollection(collMeta.ID, ddOpStr)
_, err = t.core.MetaTable.DeleteCollection(collMeta.ID, ddOpStr)
if err != nil {
return err
}
@ -336,7 +336,7 @@ func (t *HasCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_HasCollection {
return fmt.Errorf("has collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
_, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
_, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0)
if err == nil {
t.HasCollection = true
} else {
@ -375,12 +375,12 @@ func (t *DescribeCollectionReqTask) Execute(ctx context.Context) error {
var err error
if t.Req.CollectionName != "" {
collInfo, err = t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
collInfo, err = t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0)
if err != nil {
return err
}
} else {
collInfo, err = t.core.MetaTable.GetCollectionByID(t.Req.CollectionID)
collInfo, err = t.core.MetaTable.GetCollectionByID(t.Req.CollectionID, 0)
if err != nil {
return err
}
@ -427,7 +427,7 @@ func (t *ShowCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_ShowCollections {
return fmt.Errorf("show collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
coll, err := t.core.MetaTable.ListCollections()
coll, err := t.core.MetaTable.ListCollections(0)
if err != nil {
return err
}
@ -460,7 +460,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_CreatePartition {
return fmt.Errorf("create partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0)
if err != nil {
return err
}
@ -486,7 +486,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
return err
}
err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ddOpStr)
_, err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ddOpStr)
if err != nil {
return err
}
@ -528,11 +528,11 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DropPartition {
return fmt.Errorf("drop partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
collInfo, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
collInfo, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0)
if err != nil {
return err
}
partInfo, err := t.core.MetaTable.GetPartitionByName(collInfo.ID, t.Req.PartitionName)
partInfo, err := t.core.MetaTable.GetPartitionByName(collInfo.ID, t.Req.PartitionName, 0)
if err != nil {
return err
}
@ -554,7 +554,7 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
return err
}
_, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ddOpStr)
_, _, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ddOpStr)
if err != nil {
return err
}
@ -597,11 +597,11 @@ func (t *HasPartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_HasPartition {
return fmt.Errorf("has partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0)
if err != nil {
return err
}
t.HasPartition = t.core.MetaTable.HasPartition(coll.ID, t.Req.PartitionName)
t.HasPartition = t.core.MetaTable.HasPartition(coll.ID, t.Req.PartitionName, 0)
return nil
}
@ -634,15 +634,15 @@ func (t *ShowPartitionReqTask) Execute(ctx context.Context) error {
var coll *etcdpb.CollectionInfo
var err error
if t.Req.CollectionName == "" {
coll, err = t.core.MetaTable.GetCollectionByID(t.Req.CollectionID)
coll, err = t.core.MetaTable.GetCollectionByID(t.Req.CollectionID, 0)
} else {
coll, err = t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
coll, err = t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0)
}
if err != nil {
return err
}
for _, partID := range coll.PartitionIDs {
partMeta, err := t.core.MetaTable.GetPartitionByID(partID)
partMeta, err := t.core.MetaTable.GetPartitionByID(coll.ID, partID, 0)
if err != nil {
return err
}
@ -678,7 +678,7 @@ func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DescribeSegment {
return fmt.Errorf("describe segment, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID)
coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID, 0)
if err != nil {
return err
}
@ -687,7 +687,7 @@ func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error {
if exist {
break
}
partMeta, err := t.core.MetaTable.GetPartitionByID(partID)
partMeta, err := t.core.MetaTable.GetPartitionByID(coll.ID, partID, 0)
if err != nil {
return err
}
@ -738,7 +738,7 @@ func (t *ShowSegmentReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_ShowSegments {
return fmt.Errorf("show segments, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID)
coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID, 0)
if err != nil {
return err
}
@ -752,7 +752,7 @@ func (t *ShowSegmentReqTask) Execute(ctx context.Context) error {
if !exist {
return fmt.Errorf("partition id = %d not belong to collection id = %d", t.Req.PartitionID, t.Req.CollectionID)
}
partMeta, err := t.core.MetaTable.GetPartitionByID(t.Req.PartitionID)
partMeta, err := t.core.MetaTable.GetPartitionByID(coll.ID, t.Req.PartitionID, 0)
if err != nil {
return err
}
@ -900,6 +900,6 @@ func (t *DropIndexReqTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
_, _, err = t.core.MetaTable.DropIndex(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName)
_, _, _, err = t.core.MetaTable.DropIndex(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName)
return err
}