mirror of https://github.com/milvus-io/milvus.git
Fix multiSave childTask failed to etcd (#10340)
Signed-off-by: xige-16 <xi.ge@zilliz.com>pull/10349/head
parent
f9408259b4
commit
f68a3358dd
|
@ -438,9 +438,11 @@ func (scheduler *TaskScheduler) processTask(t task) error {
|
|||
var taskInfoKey string
|
||||
// assign taskID for childTask and update triggerTask's childTask to etcd
|
||||
updateKVFn := func(parentTask task) error {
|
||||
kvs := make(map[string]string)
|
||||
kvs[taskInfoKey] = strconv.Itoa(int(taskDone))
|
||||
// TODO:: if childTask.type == loadSegment, then only save segmentID to etcd instead of binlog paths
|
||||
// The binlog paths of each segment will be written into etcd in advance
|
||||
// The binlog paths will be filled in through etcd when load segment grpc is called
|
||||
for _, childTask := range parentTask.getChildTask() {
|
||||
kvs := make(map[string]string)
|
||||
id, err := scheduler.taskIDAllocator()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -454,11 +456,18 @@ func (scheduler *TaskScheduler) processTask(t task) error {
|
|||
kvs[childTaskKey] = string(blobs)
|
||||
stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, childTask.getTaskID())
|
||||
kvs[stateKey] = strconv.Itoa(int(taskUndo))
|
||||
err = scheduler.client.MultiSave(kvs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err := scheduler.client.MultiSave(kvs)
|
||||
|
||||
parentInfoKey := fmt.Sprintf("%s/%d", taskInfoPrefix, parentTask.getTaskID())
|
||||
err := scheduler.client.Save(parentInfoKey, strconv.Itoa(int(taskDone)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -16,20 +16,22 @@ import (
|
|||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type testTask struct {
|
||||
baseTask
|
||||
baseMsg *commonpb.MsgBase
|
||||
cluster Cluster
|
||||
meta Meta
|
||||
nodeID int64
|
||||
baseMsg *commonpb.MsgBase
|
||||
cluster Cluster
|
||||
meta Meta
|
||||
nodeID int64
|
||||
binlogSize int
|
||||
}
|
||||
|
||||
func (tt *testTask) msgBase() *commonpb.MsgBase {
|
||||
|
@ -58,6 +60,39 @@ func (tt *testTask) execute(ctx context.Context) error {
|
|||
log.Debug("test task execute...")
|
||||
|
||||
switch tt.baseMsg.MsgType {
|
||||
case commonpb.MsgType_LoadCollection:
|
||||
binlogs := make([]*datapb.FieldBinlog, 0)
|
||||
binlogs = append(binlogs, &datapb.FieldBinlog{
|
||||
FieldID: 0,
|
||||
Binlogs: []string{funcutil.RandomString(tt.binlogSize)},
|
||||
})
|
||||
for id := 0; id < 10; id++ {
|
||||
segmentInfo := &querypb.SegmentLoadInfo{
|
||||
SegmentID: UniqueID(id),
|
||||
PartitionID: defaultPartitionID,
|
||||
CollectionID: defaultCollectionID,
|
||||
BinlogPaths: binlogs,
|
||||
}
|
||||
req := &querypb.LoadSegmentsRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_LoadSegments,
|
||||
},
|
||||
Infos: []*querypb.SegmentLoadInfo{segmentInfo},
|
||||
}
|
||||
loadTask := &loadSegmentTask{
|
||||
baseTask: &baseTask{
|
||||
ctx: tt.ctx,
|
||||
condition: newTaskCondition(tt.ctx),
|
||||
triggerCondition: tt.triggerCondition,
|
||||
},
|
||||
LoadSegmentsRequest: req,
|
||||
meta: tt.meta,
|
||||
cluster: tt.cluster,
|
||||
excludeNodeIDs: []int64{},
|
||||
}
|
||||
loadTask.setParentTask(tt)
|
||||
tt.addChildTask(loadTask)
|
||||
}
|
||||
case commonpb.MsgType_LoadSegments:
|
||||
childTask := &loadSegmentTask{
|
||||
baseTask: &baseTask{
|
||||
|
@ -416,3 +451,39 @@ func TestReloadTaskFromKV(t *testing.T) {
|
|||
assert.Equal(t, taskDone, task.getState())
|
||||
assert.Equal(t, 1, len(task.getChildTask()))
|
||||
}
|
||||
|
||||
func Test_saveInternalTaskToEtcd(t *testing.T) {
|
||||
refreshParams()
|
||||
ctx := context.Background()
|
||||
queryCoord, err := startQueryCoord(ctx)
|
||||
assert.Nil(t, err)
|
||||
|
||||
testTask := &testTask{
|
||||
baseTask: baseTask{
|
||||
ctx: ctx,
|
||||
condition: newTaskCondition(ctx),
|
||||
triggerCondition: querypb.TriggerCondition_grpcRequest,
|
||||
taskID: 100,
|
||||
},
|
||||
baseMsg: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_LoadCollection,
|
||||
},
|
||||
cluster: queryCoord.cluster,
|
||||
meta: queryCoord.meta,
|
||||
nodeID: defaultQueryNodeID,
|
||||
}
|
||||
|
||||
t.Run("Test SaveEtcdFail", func(t *testing.T) {
|
||||
// max send size limit of etcd is 2097152
|
||||
testTask.binlogSize = 3000000
|
||||
err = queryCoord.scheduler.processTask(testTask)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
t.Run("Test SaveEtcdSuccess", func(t *testing.T) {
|
||||
testTask.childTasks = []task{}
|
||||
testTask.binlogSize = 500000
|
||||
err = queryCoord.scheduler.processTask(testTask)
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue