set timestamp on dd requests (#5298)

- set dd request's time tick on master
- send time tick into dd channel

See also: #5291

Signed-off-by: yefu.chen yefu.chen@zilliz.com
pull/5318/head
neza2017 2021-05-20 14:14:14 +08:00 committed by GitHub
parent 38f5b0826f
commit c0daf8e41d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 424 additions and 302 deletions

View File

@ -14,4 +14,5 @@ master:
minSegmentSizeToEnableIndex: 1024
nodeID: 100
timeout: 3600 # time out, 5 seconds
timeTickInterval: 200 # ms

View File

@ -15,7 +15,6 @@ import (
"context"
"fmt"
"math/rand"
"regexp"
"strconv"
"strings"
"sync"
@ -294,21 +293,21 @@ func TestGrpcService(t *testing.T) {
assert.Equal(t, createCollectionArray[1].Base.MsgType, commonpb.MsgType_CreateCollection)
assert.Equal(t, createCollectionArray[1].CollectionName, "testColl-again")
//time stamp go back
schema.Name = "testColl-goback"
sbf, err = proto.Marshal(&schema)
assert.Nil(t, err)
req.CollectionName = schema.Name
req.Schema = sbf
req.Base.MsgID = 103
req.Base.Timestamp = 103
req.Base.SourceID = 103
status, err = cli.CreateCollection(ctx, req)
assert.Nil(t, err)
assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_UnexpectedError)
matched, err := regexp.MatchString("input timestamp = [0-9]+, last dd time stamp = [0-9]+", status.Reason)
assert.Nil(t, err)
assert.True(t, matched)
//time stamp go back, master response to add the timestamp, so the time tick will never go back
//schema.Name = "testColl-goback"
//sbf, err = proto.Marshal(&schema)
//assert.Nil(t, err)
//req.CollectionName = schema.Name
//req.Schema = sbf
//req.Base.MsgID = 103
//req.Base.Timestamp = 103
//req.Base.SourceID = 103
//status, err = cli.CreateCollection(ctx, req)
//assert.Nil(t, err)
//assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_UnexpectedError)
//matched, err := regexp.MatchString("input timestamp = [0-9]+, last dd time stamp = [0-9]+", status.Reason)
//assert.Nil(t, err)
//assert.True(t, matched)
})
t.Run("has collection", func(t *testing.T) {

View File

@ -38,7 +38,7 @@ type TxnKV interface {
type SnapShotKV interface {
Save(key, value string) (typeutil.Timestamp, error)
Load(key string, ts typeutil.Timestamp) (string, error)
MultiSave(kvs map[string]string) (typeutil.Timestamp, error)
MultiSave(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error)
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) (typeutil.Timestamp, error)
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
}

View File

@ -53,6 +53,10 @@ import (
// milvuspb -> milvuspb
// masterpb2 -> masterpb (master_service)
//NEZA2017, DEBUG FLAG for milvus 2.0, this part should remove when milvus 2.0 release
var SetDDTimeTimeByMaster bool = false
// ------------------ struct -----------------------
// DdOperation used to save ddMsg into ETCD
@ -78,12 +82,12 @@ type Core struct {
MetaTable *metaTable
//id allocator
idAllocator func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error)
idAllocatorUpdate func() error
IDAllocator func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error)
IDAllocatorUpdate func() error
//tso allocator
tsoAllocator func(count uint32) (typeutil.Timestamp, error)
tsoAllocatorUpdate func() error
TSOAllocator func(count uint32) (typeutil.Timestamp, error)
TSOAllocatorUpdate func() error
//inner members
ctx context.Context
@ -130,8 +134,7 @@ type Core struct {
ReleaseCollection func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error
//dd request scheduler
ddReqQueue chan reqTask //dd request will be push into this chan
lastDdTimeStamp typeutil.Timestamp
ddReqQueue chan reqTask //dd request will be push into this chan
//time tick loop
lastTimeTick typeutil.Timestamp
@ -169,16 +172,16 @@ func (c *Core) checkInit() error {
if c.MetaTable == nil {
return fmt.Errorf("MetaTable is nil")
}
if c.idAllocator == nil {
if c.IDAllocator == nil {
return fmt.Errorf("idAllocator is nil")
}
if c.idAllocatorUpdate == nil {
if c.IDAllocatorUpdate == nil {
return fmt.Errorf("idAllocatorUpdate is nil")
}
if c.tsoAllocator == nil {
if c.TSOAllocator == nil {
return fmt.Errorf("tsoAllocator is nil")
}
if c.tsoAllocatorUpdate == nil {
if c.TSOAllocatorUpdate == nil {
return fmt.Errorf("tsoAllocatorUpdate is nil")
}
if c.etcdCli == nil {
@ -243,42 +246,57 @@ func (c *Core) startDdScheduler() {
log.Debug("dd chan is closed, exit task execution loop")
return
}
ts, err := task.Ts()
if err != nil {
task.Notify(err)
break
}
if !task.IgnoreTimeStamp() && ts <= c.lastDdTimeStamp {
task.Notify(fmt.Errorf("input timestamp = %d, last dd time stamp = %d", ts, c.lastDdTimeStamp))
break
}
err = task.Execute(task.Ctx())
err := task.Execute(task.Ctx())
task.Notify(err)
if ts > c.lastDdTimeStamp {
c.lastDdTimeStamp = ts
}
}
}
}
func (c *Core) startTimeTickLoop() {
for {
select {
case <-c.ctx.Done():
log.Debug("close master time tick loop")
return
case tt, ok := <-c.ProxyTimeTickChan:
if !ok {
log.Warn("proxyTimeTickStream is closed, exit time tick loop")
if SetDDTimeTimeByMaster {
ticker := time.NewTimer(time.Duration(Params.TimeTickInterval) * time.Millisecond)
cnt := 0
for {
select {
case <-c.ctx.Done():
log.Debug("master context closed", zap.Error(c.ctx.Err()))
return
case <-ticker.C:
if len(c.ddReqQueue) < 2 || cnt > 5 {
tt := &TimetickTask{
baseReqTask: baseReqTask{
ctx: c.ctx,
cv: make(chan error, 1),
core: c,
},
}
c.ddReqQueue <- tt
cnt = 0
} else {
cnt++
}
}
if tt <= c.lastTimeTick {
log.Warn("master time tick go back", zap.Uint64("last time tick", c.lastTimeTick), zap.Uint64("input time tick ", tt))
}
} else {
for {
select {
case <-c.ctx.Done():
log.Debug("close master time tick loop")
return
case tt, ok := <-c.ProxyTimeTickChan:
if !ok {
log.Warn("proxyTimeTickStream is closed, exit time tick loop")
return
}
if tt <= c.lastTimeTick {
log.Warn("master time tick go back", zap.Uint64("last time tick", c.lastTimeTick), zap.Uint64("input time tick ", tt))
}
if err := c.SendTimeTick(tt); err != nil {
log.Warn("master send time tick into dd and time_tick channel failed", zap.String("error", err.Error()))
}
c.lastTimeTick = tt
}
if err := c.SendTimeTick(tt); err != nil {
log.Warn("master send time tick into dd and time_tick channel failed", zap.String("error", err.Error()))
}
c.lastTimeTick = tt
}
}
}
@ -360,11 +378,11 @@ func (c *Core) tsLoop() {
for {
select {
case <-tsoTicker.C:
if err := c.tsoAllocatorUpdate(); err != nil {
if err := c.TSOAllocatorUpdate(); err != nil {
log.Warn("failed to update timestamp: ", zap.Error(err))
continue
}
if err := c.idAllocatorUpdate(); err != nil {
if err := c.IDAllocatorUpdate(); err != nil {
log.Warn("failed to update id: ", zap.Error(err))
continue
}
@ -651,7 +669,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
log.Debug("data service segment", zap.String("channel name", Params.DataServiceSegmentChannel))
c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) {
ts, err := c.tsoAllocator(1)
ts, err := c.TSOAllocator(1)
if err != nil {
return nil, err
}
@ -679,7 +697,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
}
c.GetNumRowsReq = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) {
ts, err := c.tsoAllocator(1)
ts, err := c.TSOAllocator(1)
if err != nil {
return 0, err
}
@ -815,7 +833,7 @@ func (c *Core) Init() error {
for {
var ts typeutil.Timestamp
var err error
if ts, err = c.tsoAllocator(1); err == nil {
if ts, err = c.TSOAllocator(1); err == nil {
return ts
}
time.Sleep(100 * time.Millisecond)
@ -842,10 +860,10 @@ func (c *Core) Init() error {
if initError = idAllocator.Initialize(); initError != nil {
return
}
c.idAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) {
c.IDAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) {
return idAllocator.Alloc(count)
}
c.idAllocatorUpdate = func() error {
c.IDAllocatorUpdate = func() error {
return idAllocator.UpdateID()
}
@ -853,10 +871,10 @@ func (c *Core) Init() error {
if initError = tsoAllocator.Initialize(); initError != nil {
return
}
c.tsoAllocator = func(count uint32) (typeutil.Timestamp, error) {
c.TSOAllocator = func(count uint32) (typeutil.Timestamp, error) {
return tsoAllocator.Alloc(count)
}
c.tsoAllocatorUpdate = func() error {
c.TSOAllocatorUpdate = func() error {
return tsoAllocator.UpdateTSO()
}
@ -1584,7 +1602,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques
}
func (c *Core) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRequest) (*masterpb.AllocTimestampResponse, error) {
ts, err := c.tsoAllocator(in.Count)
ts, err := c.TSOAllocator(in.Count)
if err != nil {
log.Debug("AllocTimestamp failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
return &masterpb.AllocTimestampResponse{
@ -1608,7 +1626,7 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRe
}
func (c *Core) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*masterpb.AllocIDResponse, error) {
start, _, err := c.idAllocator(in.Count)
start, _, err := c.IDAllocator(in.Count)
if err != nil {
log.Debug("AllocID failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
return &masterpb.AllocIDResponse{

View File

@ -1581,10 +1581,10 @@ func TestMasterService(t *testing.T) {
})
t.Run("alloc_error", func(t *testing.T) {
core.idAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) {
core.IDAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) {
return 0, 0, fmt.Errorf("id allocator error test")
}
core.tsoAllocator = func(count uint32) (typeutil.Timestamp, error) {
core.TSOAllocator = func(count uint32) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("tso allcoator error test")
}
r1 := &masterpb.AllocTimestampRequest{
@ -1615,6 +1615,152 @@ func TestMasterService(t *testing.T) {
})
}
func TestMasterService2(t *testing.T) {
const (
dbName = "testDb"
collName = "testColl"
partName = "testPartition"
)
SetDDTimeTimeByMaster = true
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
msFactory := msgstream.NewPmsFactory()
Params.Init()
core, err := NewCore(ctx, msFactory)
assert.Nil(t, err)
randVal := rand.Int()
Params.TimeTickChannel = fmt.Sprintf("master-time-tick-%d", randVal)
Params.DdChannel = fmt.Sprintf("master-dd-%d", randVal)
Params.StatisticsChannel = fmt.Sprintf("master-statistics-%d", randVal)
Params.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.MetaRootPath)
Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath)
Params.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal)
pm := &proxyMock{
randVal: randVal,
collArray: make([]string, 0, 16),
mutex: sync.Mutex{},
}
err = core.SetProxyService(ctx, pm)
assert.Nil(t, err)
dm := &dataMock{randVal: randVal}
err = core.SetDataService(ctx, dm)
assert.Nil(t, err)
im := &indexMock{
fileArray: []string{},
idxBuildID: []int64{},
idxID: []int64{},
idxDropID: []int64{},
mutex: sync.Mutex{},
}
err = core.SetIndexService(im)
assert.Nil(t, err)
qm := &queryMock{
collID: nil,
mutex: sync.Mutex{},
}
err = core.SetQueryService(qm)
assert.Nil(t, err)
err = core.Init()
assert.Nil(t, err)
err = core.Start()
assert.Nil(t, err)
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarAddress,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err)
proxyTimeTickStream, _ := msFactory.NewMsgStream(ctx)
proxyTimeTickStream.AsProducer([]string{Params.ProxyTimeTickChannel})
dataServiceSegmentStream, _ := msFactory.NewMsgStream(ctx)
dataServiceSegmentStream.AsProducer([]string{Params.DataServiceSegmentChannel})
timeTickStream, _ := msFactory.NewMsgStream(ctx)
timeTickStream.AsConsumer([]string{Params.TimeTickChannel}, Params.MsgChannelSubName)
timeTickStream.Start()
ddStream, _ := msFactory.NewMsgStream(ctx)
ddStream.AsConsumer([]string{Params.DdChannel}, Params.MsgChannelSubName)
ddStream.Start()
time.Sleep(time.Second)
getNotTTMsg := func(ch <-chan *msgstream.MsgPack, n int) []msgstream.TsMsg {
msg := make([]msgstream.TsMsg, 0, n)
for {
m, ok := <-ch
assert.True(t, ok)
for _, m := range m.Msgs {
if _, ok := (m).(*msgstream.TimeTickMsg); !ok {
msg = append(msg, m)
}
}
if len(msg) >= n {
return msg
}
}
}
t.Run("time tick", func(t *testing.T) {
ttmsg, ok := <-timeTickStream.Chan()
assert.True(t, ok)
assert.Equal(t, 1, len(ttmsg.Msgs))
ttm, ok := (ttmsg.Msgs[0]).(*msgstream.TimeTickMsg)
assert.True(t, ok)
assert.Greater(t, ttm.Base.Timestamp, typeutil.Timestamp(0))
ddmsg, ok := <-ddStream.Chan()
assert.True(t, ok)
assert.Equal(t, 1, len(ddmsg.Msgs))
ddm, ok := (ddmsg.Msgs[0]).(*msgstream.TimeTickMsg)
assert.True(t, ok)
assert.Greater(t, ddm.Base.Timestamp, typeutil.Timestamp(0))
})
t.Run("create collection", func(t *testing.T) {
schema := schemapb.CollectionSchema{
Name: collName,
}
sbf, err := proto.Marshal(&schema)
assert.Nil(t, err)
req := &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateCollection,
Timestamp: 100,
},
DbName: dbName,
CollectionName: collName,
Schema: sbf,
}
status, err := core.CreateCollection(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
msg := getNotTTMsg(ddStream.Chan(), 2)
assert.GreaterOrEqual(t, len(msg), 2)
m1, ok := (msg[0]).(*msgstream.CreateCollectionMsg)
assert.True(t, ok)
m2, ok := (msg[1]).(*msgstream.CreatePartitionMsg)
assert.True(t, ok)
assert.Equal(t, m1.Base.Timestamp, m2.Base.Timestamp)
t.Log("time tick", m1.Base.Timestamp)
})
}
func TestCheckInit(t *testing.T) {
c, err := NewCore(context.Background(), nil)
assert.Nil(t, err)
@ -1629,25 +1775,25 @@ func TestCheckInit(t *testing.T) {
err = c.checkInit()
assert.NotNil(t, err)
c.idAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) {
c.IDAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) {
return 0, 0, nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.idAllocatorUpdate = func() error {
c.IDAllocatorUpdate = func() error {
return nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.tsoAllocator = func(count uint32) (typeutil.Timestamp, error) {
c.TSOAllocator = func(count uint32) (typeutil.Timestamp, error) {
return 0, nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.tsoAllocatorUpdate = func() error {
c.TSOAllocatorUpdate = func() error {
return nil
}
err = c.checkInit()

View File

@ -313,7 +313,7 @@ func (ms *metaSnapshot) Load(key string, ts typeutil.Timestamp) (string, error)
return string(resp.Kvs[0].Value), nil
}
func (ms *metaSnapshot) MultiSave(kvs map[string]string) (typeutil.Timestamp, error) {
func (ms *metaSnapshot) MultiSave(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
@ -321,10 +321,15 @@ func (ms *metaSnapshot) MultiSave(kvs map[string]string) (typeutil.Timestamp, er
ts := ms.timeAllactor()
strTs := strconv.FormatInt(int64(ts), 10)
ops := make([]clientv3.Op, 0, len(kvs)+1)
ops := make([]clientv3.Op, 0, len(kvs)+2)
for key, value := range kvs {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value))
}
if addition != nil {
if k, v, e := addition(ts); e == nil {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, k), v))
}
}
ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs))
resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit()
if err != nil {
@ -370,7 +375,7 @@ func (ms *metaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]str
return keys, values, nil
}
func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) (typeutil.Timestamp, error) {
func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
@ -378,10 +383,15 @@ func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, re
ts := ms.timeAllactor()
strTs := strconv.FormatInt(int64(ts), 10)
ops := make([]clientv3.Op, 0, len(saves)+len(removals)+1)
ops := make([]clientv3.Op, 0, len(saves)+len(removals)+2)
for key, value := range saves {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value))
}
if addition != nil {
if k, v, e := addition(ts); e == nil {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, k), v))
}
}
for _, key := range removals {
ops = append(ops, clientv3.OpDelete(path.Join(ms.root, key)))
}

View File

@ -282,7 +282,7 @@ func TestMultiSave(t *testing.T) {
for i := 0; i < 20; i++ {
saves := map[string]string{"k1": fmt.Sprintf("v1-%d", i), "k2": fmt.Sprintf("v2-%d", i)}
vtso = typeutil.Timestamp(100 + i*5)
ts, err := ms.MultiSave(saves)
ts, err := ms.MultiSave(saves, nil)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
@ -353,7 +353,7 @@ func TestMultiSaveAndRemoveWithPrefix(t *testing.T) {
sm := map[string]string{"ks": fmt.Sprintf("value-%d", i)}
dm := []string{fmt.Sprintf("kd-%d", i-20)}
vtso = typeutil.Timestamp(100 + i*5)
ts, err := ms.MultiSaveAndRemoveWithPrefix(sm, dm)
ts, err := ms.MultiSaveAndRemoveWithPrefix(sm, dm, nil)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}

View File

@ -208,6 +208,20 @@ func (mt *metaTable) reloadFromKV() error {
return nil
}
func (mt *metaTable) getAdditionKV(op func(ts typeutil.Timestamp) (string, error), meta map[string]string) func(ts typeutil.Timestamp) (string, string, error) {
if op == nil {
return nil
}
meta[DDMsgSendPrefix] = "false"
return func(ts typeutil.Timestamp) (string, string, error) {
val, err := op(ts)
if err != nil {
return "", "", err
}
return DDOperationPrefix, val, nil
}
}
func (mt *metaTable) AddTenant(te *pb.TenantMeta) (typeutil.Timestamp, error) {
mt.tenantLock.Lock()
defer mt.tenantLock.Unlock()
@ -238,7 +252,7 @@ func (mt *metaTable) AddProxy(po *pb.ProxyMeta) (typeutil.Timestamp, error) {
return ts, nil
}
func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo, ddOpStr string) (typeutil.Timestamp, error) {
func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -285,10 +299,8 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionIn
}
// save ddOpStr into etcd
meta[DDOperationPrefix] = ddOpStr
meta[DDMsgSendPrefix] = "false"
ts, err := mt.client.MultiSave(meta)
addition := mt.getAdditionKV(ddOpStr, meta)
ts, err := mt.client.MultiSave(meta, addition)
if err != nil {
_ = mt.reloadFromKV()
return 0, err
@ -297,7 +309,7 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionIn
return ts, nil
}
func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr string) (typeutil.Timestamp, error) {
func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -349,12 +361,9 @@ func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr string)
}
// save ddOpStr into etcd
var saveMeta = map[string]string{
DDOperationPrefix: ddOpStr,
DDMsgSendPrefix: "false",
}
ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys)
var saveMeta = map[string]string{}
addition := mt.getAdditionKV(ddOpStr, saveMeta)
ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys, addition)
if err != nil {
_ = mt.reloadFromKV()
return 0, err
@ -478,7 +487,7 @@ func (mt *metaTable) ListCollections(ts typeutil.Timestamp) ([]string, error) {
return colls, nil
}
func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ddOpStr string) (typeutil.Timestamp, error) {
func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
coll, ok := mt.collID2Meta[collID]
@ -520,10 +529,9 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string
meta := map[string]string{k1: v1, k2: v2}
// save ddOpStr into etcd
meta[DDOperationPrefix] = ddOpStr
meta[DDMsgSendPrefix] = "false"
addition := mt.getAdditionKV(ddOpStr, meta)
ts, err := mt.client.MultiSave(meta)
ts, err := mt.client.MultiSave(meta, addition)
if err != nil {
_ = mt.reloadFromKV()
return 0, err
@ -589,7 +597,7 @@ func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string
}
//return timestamp, partitionid, error
func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ddOpStr string) (typeutil.Timestamp, typeutil.UniqueID, error) {
func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, typeutil.UniqueID, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -647,10 +655,9 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
}
// save ddOpStr into etcd
meta[DDOperationPrefix] = ddOpStr
meta[DDMsgSendPrefix] = "false"
addition := mt.getAdditionKV(ddOpStr, meta)
ts, err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys)
ts, err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys, addition)
if err != nil {
_ = mt.reloadFromKV()
return 0, 0, err
@ -856,7 +863,7 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.
fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collMeta.ID, dropIdxID),
}
ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta)
ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta, nil)
if err != nil {
_ = mt.reloadFromKV()
return 0, 0, false, err
@ -1035,7 +1042,7 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id
meta[k] = v
}
_, err = mt.client.MultiSave(meta)
_, err = mt.client.MultiSave(meta, nil)
if err != nil {
_ = mt.reloadFromKV()
return nil, schemapb.FieldSchema{}, err
@ -1058,7 +1065,7 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id
meta[k] = v
}
_, err = mt.client.MultiSave(meta)
_, err = mt.client.MultiSave(meta, nil)
if err != nil {
_ = mt.reloadFromKV()
return nil, schemapb.FieldSchema{}, err

View File

@ -33,8 +33,8 @@ type mockTestKV struct {
loadWithPrefix func(key string, ts typeutil.Timestamp) ([]string, []string, error)
save func(key, value string) (typeutil.Timestamp, error)
multiSave func(kvs map[string]string) (typeutil.Timestamp, error)
multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string) (typeutil.Timestamp, error)
multiSave func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
}
func (m *mockTestKV) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
@ -48,12 +48,12 @@ func (m *mockTestKV) Save(key, value string) (typeutil.Timestamp, error) {
return m.save(key, value)
}
func (m *mockTestKV) MultiSave(kvs map[string]string) (typeutil.Timestamp, error) {
return m.multiSave(kvs)
func (m *mockTestKV) MultiSave(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return m.multiSave(kvs, addition)
}
func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) (typeutil.Timestamp, error) {
return m.multiSaveAndRemoveWithPrefix(saves, removals)
func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return m.multiSaveAndRemoveWithPrefix(saves, removals, addition)
}
func Test_MockKV(t *testing.T) {
@ -253,21 +253,25 @@ func TestMetaTable(t *testing.T) {
},
}
ddOp := func(ts typeutil.Timestamp) (string, error) {
return "", nil
}
t.Run("add collection", func(t *testing.T) {
partInfoDefault.SegmentIDs = []int64{segID}
_, err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, ddOp)
assert.NotNil(t, err)
partInfoDefault.SegmentIDs = []int64{}
collInfo.PartitionIDs = []int64{segID}
_, err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, ddOp)
assert.NotNil(t, err)
collInfo.PartitionIDs = []int64{}
_, err = mt.AddCollection(collInfo, partInfoDefault, nil, "")
_, err = mt.AddCollection(collInfo, partInfoDefault, nil, ddOp)
assert.NotNil(t, err)
_, err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, ddOp)
assert.Nil(t, err)
collMeta, err := mt.GetCollectionByName("testColl", 0)
@ -287,7 +291,7 @@ func TestMetaTable(t *testing.T) {
})
t.Run("add partition", func(t *testing.T) {
_, err := mt.AddPartition(collID, partInfo.PartitionName, partInfo.PartitionID, "")
_, err := mt.AddPartition(collID, partInfo.PartitionName, partInfo.PartitionID, ddOp)
assert.Nil(t, err)
// check DD operation flag
@ -460,7 +464,7 @@ func TestMetaTable(t *testing.T) {
})
t.Run("drop partition", func(t *testing.T) {
_, id, err := mt.DeletePartition(collID, partInfo.PartitionName, "")
_, id, err := mt.DeletePartition(collID, partInfo.PartitionName, nil)
assert.Nil(t, err)
assert.Equal(t, partID, id)
@ -471,9 +475,9 @@ func TestMetaTable(t *testing.T) {
})
t.Run("drop collection", func(t *testing.T) {
_, err = mt.DeleteCollection(collIDInvalid, "")
_, err = mt.DeleteCollection(collIDInvalid, nil)
assert.NotNil(t, err)
_, err = mt.DeleteCollection(collID, "")
_, err = mt.DeleteCollection(collID, nil)
assert.Nil(t, err)
// check DD operation flag
@ -490,28 +494,28 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save error")
}
collInfo.PartitionIDs = nil
_, err := mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err := mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, "multi save error")
})
t.Run("delete collection failed", func(t *testing.T) {
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, nil
}
mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string) (typeutil.Timestamp, error) {
mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("milti save and remove with prefix error")
}
collInfo.PartitionIDs = nil
_, err := mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err := mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err)
mt.partitionID2Meta = make(map[typeutil.UniqueID]pb.PartitionInfo)
mt.indexID2Meta = make(map[int64]pb.IndexInfo)
_, err = mt.DeleteCollection(collInfo.ID, "")
_, err = mt.DeleteCollection(collInfo.ID, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, "milti save and remove with prefix error")
})
@ -522,7 +526,7 @@ func TestMetaTable(t *testing.T) {
}
collInfo.PartitionIDs = nil
_, err := mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err := mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err)
seg := &datapb.SegmentInfo{
@ -559,41 +563,41 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err)
_, err = mt.AddPartition(2, "no-part", 22, "")
_, err = mt.AddPartition(2, "no-part", 22, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, "can't find collection. id = 2")
coll := mt.collID2Meta[collInfo.ID]
coll.PartitionIDs = make([]int64, Params.MaxPartitionNum)
mt.collID2Meta[coll.ID] = coll
_, err = mt.AddPartition(coll.ID, "no-part", 22, "")
_, err = mt.AddPartition(coll.ID, "no-part", 22, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("maximum partition's number should be limit to %d", Params.MaxPartitionNum))
coll.PartitionIDs = []int64{partInfo.PartitionID}
mt.collID2Meta[coll.ID] = coll
mt.partitionID2Meta = make(map[int64]pb.PartitionInfo)
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save error")
}
_, err = mt.AddPartition(coll.ID, "no-part", 22, "")
_, err = mt.AddPartition(coll.ID, "no-part", 22, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, "multi save error")
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, nil
}
collInfo.PartitionIDs = nil
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err)
_, err = mt.AddPartition(coll.ID, partInfo.PartitionName, 22, "")
_, err = mt.AddPartition(coll.ID, partInfo.PartitionName, 22, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("partition name = %s already exists", partInfo.PartitionName))
_, err = mt.AddPartition(coll.ID, "no-part", partInfo.PartitionID, "")
_, err = mt.AddPartition(coll.ID, "no-part", partInfo.PartitionID, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("partition id = %d already exists", partInfo.PartitionID))
})
@ -602,14 +606,14 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, nil
}
err := mt.reloadFromKV()
assert.Nil(t, err)
collInfo.PartitionIDs = nil
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err)
mt.partitionID2Meta = make(map[int64]pb.PartitionInfo)
@ -623,36 +627,36 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, nil
}
err := mt.reloadFromKV()
assert.Nil(t, err)
collInfo.PartitionIDs = nil
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err)
_, _, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName, "")
_, _, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, "default partition cannot be deleted")
_, _, err = mt.DeletePartition(collInfo.ID, "abc", "")
_, _, err = mt.DeletePartition(collInfo.ID, "abc", nil)
assert.NotNil(t, err)
assert.EqualError(t, err, "partition abc does not exist")
pm := mt.partitionID2Meta[partInfo.PartitionID]
pm.SegmentIDs = []int64{11, 12, 13}
mt.partitionID2Meta[pm.PartitionID] = pm
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string) (typeutil.Timestamp, error) {
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save and remove with prefix error")
}
_, _, err = mt.DeletePartition(collInfo.ID, pm.PartitionName, "")
_, _, err = mt.DeletePartition(collInfo.ID, pm.PartitionName, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, "multi save and remove with prefix error")
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
_, _, err = mt.DeletePartition(collInfo.ID, "abc", "")
_, _, err = mt.DeletePartition(collInfo.ID, "abc", nil)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("can't find collection id = %d", collInfo.ID))
@ -665,14 +669,14 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, nil
}
err := mt.reloadFromKV()
assert.Nil(t, err)
collInfo.PartitionIDs = nil
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err)
noPart := pb.PartitionInfo{
@ -708,7 +712,7 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, nil
}
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
@ -718,7 +722,7 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err)
seg := &datapb.SegmentInfo{
@ -758,7 +762,7 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err)
_, err = mt.AddSegment(seg)
assert.Nil(t, err)
@ -776,7 +780,7 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, nil
}
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
@ -786,7 +790,7 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err)
_, _, _, err = mt.DropIndex("abc", "abc", "abc")
@ -823,10 +827,10 @@ func TestMetaTable(t *testing.T) {
err = mt.reloadFromKV()
assert.Nil(t, err)
collInfo.PartitionIDs = nil
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err)
mt.partitionID2Meta = make(map[int64]pb.PartitionInfo)
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string) (typeutil.Timestamp, error) {
mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save and remove with prefix error")
}
_, _, _, err = mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName)
@ -838,7 +842,7 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, nil
}
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
@ -848,7 +852,7 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err)
_, err = mt.GetSegmentIndexInfoByID(segID2, fieldID, "abc")
@ -895,7 +899,7 @@ func TestMetaTable(t *testing.T) {
mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, nil
}
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
@ -905,7 +909,7 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err)
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
@ -966,7 +970,7 @@ func TestMetaTable(t *testing.T) {
assert.NotNil(t, err)
assert.EqualError(t, err, "collection abc not found")
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, nil
}
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
@ -976,7 +980,7 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err)
_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, "no-field", idx)
@ -990,18 +994,18 @@ func TestMetaTable(t *testing.T) {
assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", idxInfo[0].IndexID))
mt.indexID2Meta = bakMeta
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save error")
}
_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx)
assert.NotNil(t, err)
assert.EqualError(t, err, "multi save error")
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, nil
}
collInfo.PartitionIDs = nil
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err)
coll := mt.collID2Meta[collInfo.ID]
coll.FieldIndexes = append(coll.FieldIndexes, &pb.FieldIndexInfo{FiledID: coll.FieldIndexes[0].FiledID, IndexID: coll.FieldIndexes[0].IndexID + 1})
@ -1019,7 +1023,7 @@ func TestMetaTable(t *testing.T) {
mt.indexID2Meta[anotherIdx.IndexID] = anotherIdx
idx.IndexName = idxInfo[0].IndexName
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("multi save error")
}
_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx)
@ -1039,7 +1043,7 @@ func TestMetaTable(t *testing.T) {
assert.NotNil(t, err)
assert.EqualError(t, err, "collection abc not found")
mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) {
mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, nil
}
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
@ -1049,7 +1053,7 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
collInfo.PartitionIDs = nil
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, "")
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err)
mt.indexID2Meta = make(map[int64]pb.IndexInfo)
_, _, err = mt.GetIndexByName(collInfo.Schema.Name, idxInfo[0].IndexName)
@ -1111,7 +1115,7 @@ func TestMetaWithTimestamp(t *testing.T) {
PartitionID: 11,
SegmentIDs: nil,
}
t1, err := mt.AddCollection(collInfo, partInfo, nil, "")
t1, err := mt.AddCollection(collInfo, partInfo, nil, nil)
assert.Nil(t, err)
collInfo.ID = 2
@ -1120,7 +1124,7 @@ func TestMetaWithTimestamp(t *testing.T) {
partInfo.PartitionID = 12
partInfo.PartitionName = "p2"
t2, err := mt.AddCollection(collInfo, partInfo, nil, "")
t2, err := mt.AddCollection(collInfo, partInfo, nil, nil)
assert.Nil(t, err)
assert.True(t, mt.HasCollection(1, 0))

View File

@ -45,7 +45,8 @@ type ParamTable struct {
DefaultIndexName string
MinSegmentSizeToEnableIndex int64
Timeout int
Timeout int
TimeTickInterval int
Log log.Config
@ -79,6 +80,7 @@ func (p *ParamTable) Init() {
p.initDefaultIndexName()
p.initTimeout()
p.initTimeTickInterval()
p.initLogCfg()
p.initRoleName()
@ -189,6 +191,10 @@ func (p *ParamTable) initTimeout() {
p.Timeout = p.ParseInt("master.timeout")
}
func (p *ParamTable) initTimeTickInterval() {
p.TimeTickInterval = p.ParseInt("master.timeTickInterval")
}
func (p *ParamTable) initLogCfg() {
p.Log = log.Config{}
format, err := p.Load("log.format")

View File

@ -61,4 +61,7 @@ func TestParamTable(t *testing.T) {
assert.NotZero(t, Params.Timeout)
t.Logf("master timeout = %d", Params.Timeout)
assert.NotZero(t, Params.TimeTickInterval)
t.Logf("master timetickerInterval = %d", Params.TimeTickInterval)
}

View File

@ -29,8 +29,6 @@ import (
type reqTask interface {
Ctx() context.Context
Type() commonpb.MsgType
Ts() (typeutil.Timestamp, error)
IgnoreTimeStamp() bool
Execute(ctx context.Context) error
WaitToFinish() error
Notify(err error)
@ -60,6 +58,26 @@ func (bt *baseReqTask) WaitToFinish() error {
}
}
type TimetickTask struct {
baseReqTask
}
func (t *TimetickTask) Ctx() context.Context {
return t.ctx
}
func (t *TimetickTask) Type() commonpb.MsgType {
return commonpb.MsgType_TimeTick
}
func (t *TimetickTask) Execute(ctx context.Context) error {
ts, err := t.core.TSOAllocator(1)
if err != nil {
return err
}
return t.core.SendTimeTick(ts)
}
type CreateCollectionReqTask struct {
baseReqTask
Req *milvuspb.CreateCollectionRequest
@ -73,14 +91,6 @@ func (t *CreateCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *CreateCollectionReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *CreateCollectionReqTask) IgnoreTimeStamp() bool {
return false
}
func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
const defaultShardsNum = 2
@ -122,15 +132,12 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
}
schema.Fields = append(schema.Fields, rowIDField, timeStampField)
collID, _, err := t.core.idAllocator(1)
collID, _, err := t.core.IDAllocator(1)
if err != nil {
return err
}
collTs, err := t.Ts()
if err != nil {
return err
}
partID, _, err := t.core.idAllocator(1)
collTs := t.Req.Base.Timestamp
partID, _, err := t.core.IDAllocator(1)
if err != nil {
return err
}
@ -217,12 +224,15 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOpStr, err := EncodeDdOperation(&ddCollReq, &ddPartReq, CreateCollectionDDType)
if err != nil {
return err
ddOp := func(ts typeutil.Timestamp) (string, error) {
if SetDDTimeTimeByMaster {
ddCollReq.Base.Timestamp = ts
ddPartReq.Base.Timestamp = ts
}
return EncodeDdOperation(&ddCollReq, &ddPartReq, CreateCollectionDDType)
}
_, err = t.core.MetaTable.AddCollection(&collInfo, &partInfo, idxInfo, ddOpStr)
ts, err := t.core.MetaTable.AddCollection(&collInfo, &partInfo, idxInfo, ddOp)
if err != nil {
return err
}
@ -236,6 +246,10 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
return err
}
if SetDDTimeTimeByMaster {
t.core.SendTimeTick(ts)
}
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
}
@ -253,14 +267,6 @@ func (t *DropCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *DropCollectionReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *DropCollectionReqTask) IgnoreTimeStamp() bool {
return false
}
func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DropCollection {
return fmt.Errorf("drop collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -281,12 +287,14 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOpStr, err := EncodeDdOperation(&ddReq, nil, DropCollectionDDType)
if err != nil {
return err
ddOp := func(ts typeutil.Timestamp) (string, error) {
if SetDDTimeTimeByMaster {
ddReq.Base.Timestamp = ts
}
return EncodeDdOperation(&ddReq, nil, DropCollectionDDType)
}
_, err = t.core.MetaTable.DeleteCollection(collMeta.ID, ddOpStr)
ts, err := t.core.MetaTable.DeleteCollection(collMeta.ID, ddOp)
if err != nil {
return err
}
@ -296,6 +304,10 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
return err
}
if SetDDTimeTimeByMaster {
t.core.SendTimeTick(ts)
}
//notify query service to release collection
go func() {
if err = t.core.ReleaseCollection(t.core.ctx, t.Req.Base.Timestamp, 0, collMeta.ID); err != nil {
@ -324,14 +336,6 @@ func (t *HasCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *HasCollectionReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *HasCollectionReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *HasCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_HasCollection {
return fmt.Errorf("has collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -359,14 +363,6 @@ func (t *DescribeCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *DescribeCollectionReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *DescribeCollectionReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *DescribeCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DescribeCollection {
return fmt.Errorf("describe collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -415,14 +411,6 @@ func (t *ShowCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *ShowCollectionReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *ShowCollectionReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *ShowCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_ShowCollections {
return fmt.Errorf("show collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -448,14 +436,6 @@ func (t *CreatePartitionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *CreatePartitionReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *CreatePartitionReqTask) IgnoreTimeStamp() bool {
return false
}
func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_CreatePartition {
return fmt.Errorf("create partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -464,7 +444,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
partID, _, err := t.core.idAllocator(1)
partID, _, err := t.core.IDAllocator(1)
if err != nil {
return err
}
@ -481,12 +461,14 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOpStr, err := EncodeDdOperation(&ddReq, nil, CreatePartitionDDType)
if err != nil {
return err
ddOp := func(ts typeutil.Timestamp) (string, error) {
if SetDDTimeTimeByMaster {
ddReq.Base.Timestamp = ts
}
return EncodeDdOperation(&ddReq, nil, CreatePartitionDDType)
}
_, err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ddOpStr)
ts, err := t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ddOp)
if err != nil {
return err
}
@ -496,6 +478,10 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
return err
}
if SetDDTimeTimeByMaster {
t.core.SendTimeTick(ts)
}
// error doesn't matter here
t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
@ -516,14 +502,6 @@ func (t *DropPartitionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *DropPartitionReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *DropPartitionReqTask) IgnoreTimeStamp() bool {
return false
}
func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DropPartition {
return fmt.Errorf("drop partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -549,12 +527,14 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOpStr, err := EncodeDdOperation(&ddReq, nil, DropPartitionDDType)
if err != nil {
return err
ddOp := func(ts typeutil.Timestamp) (string, error) {
if SetDDTimeTimeByMaster {
ddReq.Base.Timestamp = ts
}
return EncodeDdOperation(&ddReq, nil, DropPartitionDDType)
}
_, _, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ddOpStr)
ts, _, err := t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ddOp)
if err != nil {
return err
}
@ -564,6 +544,10 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
return err
}
if SetDDTimeTimeByMaster {
t.core.SendTimeTick(ts)
}
// error doesn't matter here
t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
@ -585,14 +569,6 @@ func (t *HasPartitionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *HasPartitionReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *HasPartitionReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *HasPartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_HasPartition {
return fmt.Errorf("has partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -619,14 +595,6 @@ func (t *ShowPartitionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *ShowPartitionReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *ShowPartitionReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *ShowPartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_ShowPartitions {
return fmt.Errorf("show partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -666,14 +634,6 @@ func (t *DescribeSegmentReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *DescribeSegmentReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *DescribeSegmentReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DescribeSegment {
return fmt.Errorf("describe segment, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -726,14 +686,6 @@ func (t *ShowSegmentReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *ShowSegmentReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *ShowSegmentReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *ShowSegmentReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_ShowSegments {
return fmt.Errorf("show segments, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -773,20 +725,12 @@ func (t *CreateIndexReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *CreateIndexReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *CreateIndexReqTask) IgnoreTimeStamp() bool {
return false
}
func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_CreateIndex {
return fmt.Errorf("create index, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
indexName := Params.DefaultIndexName //TODO, get name from request
indexID, _, err := t.core.idAllocator(1)
indexID, _, err := t.core.IDAllocator(1)
if err != nil {
return err
}
@ -827,14 +771,6 @@ func (t *DescribeIndexReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *DescribeIndexReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *DescribeIndexReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *DescribeIndexReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DescribeIndex {
return fmt.Errorf("describe index, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -873,14 +809,6 @@ func (t *DropIndexReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *DropIndexReqTask) Ts() (typeutil.Timestamp, error) {
return t.Req.Base.Timestamp, nil
}
func (t *DropIndexReqTask) IgnoreTimeStamp() bool {
return false
}
func (t *DropIndexReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DropIndex {
return fmt.Errorf("drop index, msg type = %s", commonpb.MsgType_name[int32(t.Type())])