From f68a3358ddbcc6d4bc99ba0bc4f9df0a6974cc50 Mon Sep 17 00:00:00 2001 From: xige-16 Date: Thu, 21 Oct 2021 12:18:36 +0800 Subject: [PATCH] Fix multiSave childTask failed to etcd (#10340) Signed-off-by: xige-16 --- internal/querycoord/task_scheduler.go | 15 +++- internal/querycoord/task_scheduler_test.go | 83 ++++++++++++++++++++-- 2 files changed, 89 insertions(+), 9 deletions(-) diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index 2a56244969..425add3044 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -438,9 +438,11 @@ func (scheduler *TaskScheduler) processTask(t task) error { var taskInfoKey string // assign taskID for childTask and update triggerTask's childTask to etcd updateKVFn := func(parentTask task) error { - kvs := make(map[string]string) - kvs[taskInfoKey] = strconv.Itoa(int(taskDone)) + // TODO:: if childTask.type == loadSegment, then only save segmentID to etcd instead of binlog paths + // The binlog paths of each segment will be written into etcd in advance + // The binlog paths will be filled in through etcd when load segment grpc is called for _, childTask := range parentTask.getChildTask() { + kvs := make(map[string]string) id, err := scheduler.taskIDAllocator() if err != nil { return err @@ -454,11 +456,18 @@ func (scheduler *TaskScheduler) processTask(t task) error { kvs[childTaskKey] = string(blobs) stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, childTask.getTaskID()) kvs[stateKey] = strconv.Itoa(int(taskUndo)) + err = scheduler.client.MultiSave(kvs) + if err != nil { + return err + } } - err := scheduler.client.MultiSave(kvs) + + parentInfoKey := fmt.Sprintf("%s/%d", taskInfoPrefix, parentTask.getTaskID()) + err := scheduler.client.Save(parentInfoKey, strconv.Itoa(int(taskDone))) if err != nil { return err } + return nil } diff --git a/internal/querycoord/task_scheduler_test.go b/internal/querycoord/task_scheduler_test.go index ef3f23953e..75153009ff 100644 --- a/internal/querycoord/task_scheduler_test.go +++ b/internal/querycoord/task_scheduler_test.go @@ -16,20 +16,22 @@ import ( "strconv" "testing" - "github.com/stretchr/testify/assert" - etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/stretchr/testify/assert" ) type testTask struct { baseTask - baseMsg *commonpb.MsgBase - cluster Cluster - meta Meta - nodeID int64 + baseMsg *commonpb.MsgBase + cluster Cluster + meta Meta + nodeID int64 + binlogSize int } func (tt *testTask) msgBase() *commonpb.MsgBase { @@ -58,6 +60,39 @@ func (tt *testTask) execute(ctx context.Context) error { log.Debug("test task execute...") switch tt.baseMsg.MsgType { + case commonpb.MsgType_LoadCollection: + binlogs := make([]*datapb.FieldBinlog, 0) + binlogs = append(binlogs, &datapb.FieldBinlog{ + FieldID: 0, + Binlogs: []string{funcutil.RandomString(tt.binlogSize)}, + }) + for id := 0; id < 10; id++ { + segmentInfo := &querypb.SegmentLoadInfo{ + SegmentID: UniqueID(id), + PartitionID: defaultPartitionID, + CollectionID: defaultCollectionID, + BinlogPaths: binlogs, + } + req := &querypb.LoadSegmentsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_LoadSegments, + }, + Infos: []*querypb.SegmentLoadInfo{segmentInfo}, + } + loadTask := &loadSegmentTask{ + baseTask: &baseTask{ + ctx: tt.ctx, + condition: newTaskCondition(tt.ctx), + triggerCondition: tt.triggerCondition, + }, + LoadSegmentsRequest: req, + meta: tt.meta, + cluster: tt.cluster, + excludeNodeIDs: []int64{}, + } + loadTask.setParentTask(tt) + tt.addChildTask(loadTask) + } case commonpb.MsgType_LoadSegments: childTask := &loadSegmentTask{ baseTask: &baseTask{ @@ -416,3 +451,39 @@ func TestReloadTaskFromKV(t *testing.T) { assert.Equal(t, taskDone, task.getState()) assert.Equal(t, 1, len(task.getChildTask())) } + +func Test_saveInternalTaskToEtcd(t *testing.T) { + refreshParams() + ctx := context.Background() + queryCoord, err := startQueryCoord(ctx) + assert.Nil(t, err) + + testTask := &testTask{ + baseTask: baseTask{ + ctx: ctx, + condition: newTaskCondition(ctx), + triggerCondition: querypb.TriggerCondition_grpcRequest, + taskID: 100, + }, + baseMsg: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_LoadCollection, + }, + cluster: queryCoord.cluster, + meta: queryCoord.meta, + nodeID: defaultQueryNodeID, + } + + t.Run("Test SaveEtcdFail", func(t *testing.T) { + // max send size limit of etcd is 2097152 + testTask.binlogSize = 3000000 + err = queryCoord.scheduler.processTask(testTask) + assert.NotNil(t, err) + }) + + t.Run("Test SaveEtcdSuccess", func(t *testing.T) { + testTask.childTasks = []task{} + testTask.binlogSize = 500000 + err = queryCoord.scheduler.processTask(testTask) + assert.Nil(t, err) + }) +}