milvus/internal/datacoord/task_scheduler_test.go

2397 lines
82 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/metastore"
catalogmocks "github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/util/lock"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
var (
collID = UniqueID(100)
partID = UniqueID(200)
indexID = UniqueID(300)
fieldID = UniqueID(400)
indexName = "_default_idx"
segID = UniqueID(500)
buildID = UniqueID(600)
nodeID = UniqueID(700)
partitionKeyID = UniqueID(800)
statsTaskID = UniqueID(900)
)
func createIndexMeta(catalog metastore.DataCoordCatalog) *indexMeta {
indexBuildInfo := newSegmentIndexBuildInfo()
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1025,
IndexID: indexID,
BuildID: buildID,
NodeID: 0,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 0,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 1,
NodeID: nodeID,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 2,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 2,
NodeID: nodeID,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
FailReason: "",
IsDeleted: true,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 3,
CollectionID: collID,
PartitionID: partID,
NumRows: 500,
IndexID: indexID,
BuildID: buildID + 3,
NodeID: 0,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 4,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 4,
NodeID: nodeID,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 5,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 5,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 6,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 6,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 7,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 7,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Failed,
FailReason: "error",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 8,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 8,
NodeID: nodeID + 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 9,
CollectionID: collID,
PartitionID: partID,
NumRows: 500,
IndexID: indexID,
BuildID: buildID + 9,
NodeID: 0,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 10,
CollectionID: collID,
PartitionID: partID,
NumRows: 500,
IndexID: indexID,
BuildID: buildID + 10,
NodeID: nodeID,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIndexes := typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]]()
segIdx0 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx0.Insert(indexID, &model.SegmentIndex{
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1025,
IndexID: indexID,
BuildID: buildID,
NodeID: 0,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 0,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx1 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx1.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 1,
NodeID: nodeID,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx2 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx2.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 2,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 2,
NodeID: nodeID,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
FailReason: "",
IsDeleted: true,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx3 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx3.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 3,
CollectionID: collID,
PartitionID: partID,
NumRows: 500,
IndexID: indexID,
BuildID: buildID + 3,
NodeID: 0,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx4 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx4.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 4,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 4,
NodeID: nodeID,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx5 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx5.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 5,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 5,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx6 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx6.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 6,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 6,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx7 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx7.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 7,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 7,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Failed,
FailReason: "error",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx8 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx8.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 8,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 8,
NodeID: nodeID + 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx9 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx9.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 9,
CollectionID: collID,
PartitionID: partID,
NumRows: 500,
IndexID: indexID,
BuildID: buildID + 9,
NodeID: 0,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx10 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx10.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 10,
CollectionID: collID,
PartitionID: partID,
NumRows: 500,
IndexID: indexID,
BuildID: buildID + 10,
NodeID: nodeID,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIndexes.Insert(segID, segIdx0)
segIndexes.Insert(segID+1, segIdx1)
segIndexes.Insert(segID+2, segIdx2)
segIndexes.Insert(segID+3, segIdx3)
segIndexes.Insert(segID+4, segIdx4)
segIndexes.Insert(segID+5, segIdx5)
segIndexes.Insert(segID+6, segIdx6)
segIndexes.Insert(segID+7, segIdx7)
segIndexes.Insert(segID+8, segIdx8)
segIndexes.Insert(segID+9, segIdx9)
segIndexes.Insert(segID+10, segIdx10)
return &indexMeta{
catalog: catalog,
keyLock: lock.NewKeyLock[UniqueID](),
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 1,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "HNSW",
},
{
Key: common.MetricTypeKey,
Value: "L2",
},
},
},
},
},
segmentIndexes: segIndexes,
segmentBuildInfo: indexBuildInfo,
}
}
type testMetaOption func(*meta)
func withAnalyzeMeta(am *analyzeMeta) testMetaOption {
return func(mt *meta) {
mt.analyzeMeta = am
}
}
func withIndexMeta(im *indexMeta) testMetaOption {
return func(mt *meta) {
mt.indexMeta = im
}
}
func withStatsTaskMeta(stm *statsTaskMeta) testMetaOption {
return func(mt *meta) {
mt.statsTaskMeta = stm
}
}
func createMeta(catalog metastore.DataCoordCatalog, opts ...testMetaOption) *meta {
mt := &meta{
catalog: catalog,
collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](),
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
1000: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1000,
CollectionID: 10000,
PartitionID: 10001,
NumOfRows: 3000,
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{{FieldID: 10002, Binlogs: []*datapb.Binlog{{LogID: 1}, {LogID: 2}, {LogID: 3}}}},
},
},
1001: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1001,
CollectionID: 10000,
PartitionID: 10001,
NumOfRows: 3000,
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{{FieldID: 10002, Binlogs: []*datapb.Binlog{{LogID: 1}, {LogID: 2}, {LogID: 3}}}},
},
},
1002: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1002,
CollectionID: 10000,
PartitionID: 10001,
NumOfRows: 3000,
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{{FieldID: 10002, Binlogs: []*datapb.Binlog{{LogID: 1}, {LogID: 2}, {LogID: 3}}}},
},
},
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1025,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 1,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 2: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 2,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 3: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 3,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 500,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 4: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 4,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 5: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 5,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 6: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 6,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 7: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 7,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 8: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 8,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 9: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 9,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 500,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 10: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 10,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 500,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
},
},
}
for _, opt := range opts {
opt(mt)
}
return mt
}
type taskSchedulerSuite struct {
suite.Suite
collectionID int64
partitionID int64
fieldID int64
segmentIDs []int64
nodeID int64
duration time.Duration
}
func (s *taskSchedulerSuite) initParams() {
s.collectionID = collID
s.partitionID = partID
s.fieldID = fieldID
s.nodeID = nodeID
s.segmentIDs = []int64{1000, 1001, 1002}
s.duration = time.Millisecond * 100
}
func (s *taskSchedulerSuite) createAnalyzeMeta(catalog metastore.DataCoordCatalog) *analyzeMeta {
return &analyzeMeta{
ctx: context.Background(),
catalog: catalog,
tasks: map[int64]*indexpb.AnalyzeTask{
1: {
CollectionID: s.collectionID,
PartitionID: s.partitionID,
FieldID: s.fieldID,
SegmentIDs: s.segmentIDs,
TaskID: 1,
State: indexpb.JobState_JobStateInit,
FieldType: schemapb.DataType_FloatVector,
},
2: {
CollectionID: s.collectionID,
PartitionID: s.partitionID,
FieldID: s.fieldID,
SegmentIDs: s.segmentIDs,
TaskID: 2,
NodeID: s.nodeID,
State: indexpb.JobState_JobStateInit,
FieldType: schemapb.DataType_FloatVector,
},
3: {
CollectionID: s.collectionID,
PartitionID: s.partitionID,
FieldID: s.fieldID,
SegmentIDs: s.segmentIDs,
TaskID: 3,
NodeID: s.nodeID,
State: indexpb.JobState_JobStateFinished,
FieldType: schemapb.DataType_FloatVector,
},
4: {
CollectionID: s.collectionID,
PartitionID: s.partitionID,
FieldID: s.fieldID,
SegmentIDs: s.segmentIDs,
TaskID: 4,
NodeID: s.nodeID,
State: indexpb.JobState_JobStateFailed,
FieldType: schemapb.DataType_FloatVector,
},
5: {
CollectionID: s.collectionID,
PartitionID: s.partitionID,
FieldID: s.fieldID,
SegmentIDs: []int64{1001, 1002},
TaskID: 5,
NodeID: s.nodeID,
State: indexpb.JobState_JobStateRetry,
FieldType: schemapb.DataType_FloatVector,
},
},
}
}
func (s *taskSchedulerSuite) SetupSuite() {
paramtable.Init()
s.initParams()
Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.SwapTempValue("0")
}
func (s *taskSchedulerSuite) TearDownSuite() {
Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.SwapTempValue("16")
}
func (s *taskSchedulerSuite) scheduler(handler Handler) {
ctx := context.Background()
var once sync.Once
paramtable.Get().Save(paramtable.Get().DataCoordCfg.TaskSlowThreshold.Key, "1")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.TaskSlowThreshold.Key)
catalog := catalogmocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task *indexpb.AnalyzeTask) error {
once.Do(func() {
time.Sleep(time.Second * 3)
})
return nil
})
catalog.EXPECT().DropSegmentIndex(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(nil).Maybe()
catalog.EXPECT().DropAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Maybe()
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil)
// catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(nil)
in := mocks.NewMockIndexNodeClient(s.T())
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil)
in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, request *workerpb.QueryJobsV2Request, option ...grpc.CallOption) (*workerpb.QueryJobsV2Response, error) {
once.Do(func() {
time.Sleep(time.Second * 3)
})
switch request.GetJobType() {
case indexpb.JobType_JobTypeIndexJob:
results := make([]*workerpb.IndexTaskInfo, 0)
for _, buildID := range request.GetTaskIDs() {
results = append(results, &workerpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Finished,
IndexFileKeys: []string{"file1", "file2", "file3"},
SerializedSize: 1024,
FailReason: "",
CurrentIndexVersion: 1,
IndexStoreVersion: 1,
})
}
return &workerpb.QueryJobsV2Response{
Status: merr.Success(),
ClusterID: request.GetClusterID(),
Result: &workerpb.QueryJobsV2Response_IndexJobResults{
IndexJobResults: &workerpb.IndexJobResults{
Results: results,
},
},
}, nil
case indexpb.JobType_JobTypeAnalyzeJob:
results := make([]*workerpb.AnalyzeResult, 0)
for _, taskID := range request.GetTaskIDs() {
results = append(results, &workerpb.AnalyzeResult{
TaskID: taskID,
State: indexpb.JobState_JobStateFinished,
CentroidsFile: fmt.Sprintf("%d/stats_file", taskID),
FailReason: "",
})
}
return &workerpb.QueryJobsV2Response{
Status: merr.Success(),
ClusterID: request.GetClusterID(),
Result: &workerpb.QueryJobsV2Response_AnalyzeJobResults{
AnalyzeJobResults: &workerpb.AnalyzeResults{
Results: results,
},
},
}, nil
default:
return &workerpb.QueryJobsV2Response{
Status: merr.Status(errors.New("unknown job type")),
ClusterID: request.GetClusterID(),
}, nil
}
})
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil)
workerManager := session.NewMockWorkerManager(s.T())
workerManager.EXPECT().QuerySlots().RunAndReturn(func() map[int64]*session.WorkerSlots {
return map[int64]*session.WorkerSlots{
1: {
NodeID: 1,
TotalSlots: 16,
AvailableSlots: 16,
},
}
})
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true)
mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)),
withStatsTaskMeta(&statsTaskMeta{
ctx: ctx,
catalog: catalog,
keyLock: lock.NewKeyLock[UniqueID](),
tasks: typeutil.NewConcurrentMap[UniqueID, *indexpb.StatsTask](),
segmentID2Tasks: typeutil.NewConcurrentMap[string, *indexpb.StatsTask](),
}))
cm := mocks.NewChunkManager(s.T())
cm.EXPECT().RootPath().Return("root")
scheduler := newTaskScheduler(ctx, mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil, nil)
s.Equal(6, scheduler.pendingTasks.TaskCount())
s.Equal(3, scheduler.runningTasks.Len())
s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(1).GetState())
s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(2).GetState())
t5, ok := scheduler.runningTasks.Get(5)
s.True(ok)
s.Equal(indexpb.JobState_JobStateRetry, t5.GetState())
s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(buildID).GetState())
t6, ok := scheduler.runningTasks.Get(buildID + 1)
s.True(ok)
s.Equal(indexpb.JobState_JobStateInProgress, t6.GetState())
s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(buildID+3).GetState())
t8, ok := scheduler.runningTasks.Get(buildID + 8)
s.True(ok)
s.Equal(indexpb.JobState_JobStateInProgress, t8.GetState())
s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(buildID+9).GetState())
s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(buildID+10).GetState())
mt.segments.DropSegment(segID + 9)
scheduler.scheduleDuration = time.Millisecond * 500
scheduler.collectMetricsDuration = time.Millisecond * 200
scheduler.Start()
s.Run("Submit", func() {
taskID := int64(6)
newTask := &indexpb.AnalyzeTask{
CollectionID: s.collectionID,
PartitionID: s.partitionID,
FieldID: s.fieldID,
SegmentIDs: s.segmentIDs,
TaskID: taskID,
}
err := scheduler.meta.analyzeMeta.AddAnalyzeTask(newTask)
s.NoError(err)
t := &analyzeTask{
taskID: taskID,
taskInfo: &workerpb.AnalyzeResult{
TaskID: taskID,
State: indexpb.JobState_JobStateInit,
FailReason: "",
},
}
scheduler.enqueue(t)
})
for {
if scheduler.pendingTasks.TaskCount() == 0 {
// maybe task is waiting for assigning, so sleep three seconds.
time.Sleep(time.Second * 3)
taskNum := scheduler.runningTasks.Len()
if taskNum == 0 {
break
}
}
time.Sleep(time.Second)
}
scheduler.Stop()
s.Equal(indexpb.JobState_JobStateFinished, mt.analyzeMeta.GetTask(1).GetState())
s.Equal(indexpb.JobState_JobStateFinished, mt.analyzeMeta.GetTask(2).GetState())
s.Equal(indexpb.JobState_JobStateFinished, mt.analyzeMeta.GetTask(3).GetState())
s.Equal(indexpb.JobState_JobStateFailed, mt.analyzeMeta.GetTask(4).GetState())
s.Equal(indexpb.JobState_JobStateFinished, mt.analyzeMeta.GetTask(5).GetState())
s.Equal(indexpb.JobState_JobStateFinished, mt.analyzeMeta.GetTask(6).GetState())
indexJob, exist := mt.indexMeta.GetIndexJob(buildID)
s.True(exist)
s.Equal(commonpb.IndexState_Finished, indexJob.IndexState)
indexJob, exist = mt.indexMeta.GetIndexJob(buildID + 1)
s.True(exist)
s.Equal(commonpb.IndexState_Finished, indexJob.IndexState)
indexJob, exist = mt.indexMeta.GetIndexJob(buildID + 2)
s.True(exist)
s.True(indexJob.IsDeleted)
indexJob, exist = mt.indexMeta.GetIndexJob(buildID + 3)
s.True(exist)
s.Equal(commonpb.IndexState_Finished, indexJob.IndexState)
indexJob, exist = mt.indexMeta.GetIndexJob(buildID + 4)
s.True(exist)
s.Equal(commonpb.IndexState_Finished, indexJob.IndexState)
indexJob, exist = mt.indexMeta.GetIndexJob(buildID + 5)
s.True(exist)
s.Equal(commonpb.IndexState_Finished, indexJob.IndexState)
indexJob, exist = mt.indexMeta.GetIndexJob(buildID + 6)
s.True(exist)
s.Equal(commonpb.IndexState_Finished, indexJob.IndexState)
indexJob, exist = mt.indexMeta.GetIndexJob(buildID + 7)
s.True(exist)
s.Equal(commonpb.IndexState_Failed, indexJob.IndexState)
indexJob, exist = mt.indexMeta.GetIndexJob(buildID + 8)
s.True(exist)
s.Equal(commonpb.IndexState_Finished, indexJob.IndexState)
_, exist = mt.indexMeta.GetIndexJob(buildID + 9)
s.False(exist)
// segment not healthy, remove task
indexJob, exist = mt.indexMeta.GetIndexJob(buildID + 10)
s.True(exist)
s.Equal(commonpb.IndexState_Finished, indexJob.IndexState)
}
func (s *taskSchedulerSuite) Test_scheduler() {
handler := NewNMockHandler(s.T())
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{FieldID: 100, Name: "pk", IsPrimaryKey: true, IsPartitionKey: true, DataType: schemapb.DataType_Int64},
{FieldID: s.fieldID, Name: "vec", TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "10"}}},
},
},
}, nil)
s.Run("test scheduler with indexBuilderV1", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
s.scheduler(handler)
})
}
func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
s.Run("segment info is nil", func() {
ctx := context.Background()
catalog := catalogmocks.NewDataCoordCatalog(s.T())
workerManager := session.NewMockWorkerManager(s.T())
workerManager.EXPECT().QuerySlots().RunAndReturn(func() map[int64]*session.WorkerSlots {
return map[int64]*session.WorkerSlots{
1: {
NodeID: 1,
TotalSlots: 16,
AvailableSlots: 16,
},
}
})
mt := createMeta(catalog,
withAnalyzeMeta(&analyzeMeta{
ctx: context.Background(),
catalog: catalog,
tasks: map[int64]*indexpb.AnalyzeTask{
1: {
CollectionID: s.collectionID,
PartitionID: s.partitionID,
FieldID: s.fieldID,
SegmentIDs: s.segmentIDs,
TaskID: 1,
State: indexpb.JobState_JobStateInit,
},
},
}),
withIndexMeta(&indexMeta{
ctx: ctx,
keyLock: lock.NewKeyLock[UniqueID](),
catalog: catalog,
segmentIndexes: typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]](),
segmentBuildInfo: newSegmentIndexBuildInfo(),
}),
withStatsTaskMeta(&statsTaskMeta{
ctx: ctx,
catalog: catalog,
keyLock: lock.NewKeyLock[UniqueID](),
tasks: typeutil.NewConcurrentMap[UniqueID, *indexpb.StatsTask](),
segmentID2Tasks: typeutil.NewConcurrentMap[string, *indexpb.StatsTask](),
}))
handler := NewNMockHandler(s.T())
scheduler := newTaskScheduler(ctx, mt, workerManager, nil, nil, handler, nil, nil)
mt.segments.DropSegment(1000)
scheduler.scheduleDuration = s.duration
scheduler.Start()
// taskID 1 PreCheck failed --> state: Failed --> save
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Once()
workerManager.EXPECT().GetClientByID(mock.Anything).Return(nil, false).Once()
for {
if scheduler.pendingTasks.TaskCount() == 0 {
taskNum := scheduler.runningTasks.Len()
if taskNum == 0 {
break
}
}
time.Sleep(time.Second)
}
scheduler.Stop()
s.Equal(indexpb.JobState_JobStateFailed, mt.analyzeMeta.GetTask(1).GetState())
})
s.Run("etcd save failed", func() {
ctx := context.Background()
catalog := catalogmocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().DropAnalyzeTask(mock.Anything, mock.Anything).Return(nil)
in := mocks.NewMockIndexNodeClient(s.T())
workerManager := session.NewMockWorkerManager(s.T())
workerManager.EXPECT().QuerySlots().RunAndReturn(func() map[int64]*session.WorkerSlots {
return map[int64]*session.WorkerSlots{
1: {
NodeID: 1,
TotalSlots: 16,
AvailableSlots: 16,
},
}
})
mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)),
withIndexMeta(&indexMeta{
ctx: ctx,
catalog: catalog,
keyLock: lock.NewKeyLock[UniqueID](),
segmentIndexes: typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]](),
segmentBuildInfo: newSegmentIndexBuildInfo(),
}), withStatsTaskMeta(&statsTaskMeta{
ctx: ctx,
catalog: catalog,
keyLock: lock.NewKeyLock[UniqueID](),
tasks: typeutil.NewConcurrentMap[UniqueID, *indexpb.StatsTask](),
segmentID2Tasks: typeutil.NewConcurrentMap[string, *indexpb.StatsTask](),
}))
handler := NewNMockHandler(s.T())
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: s.fieldID,
Name: "vec", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: "dim", Value: "10"},
},
},
},
},
}, nil)
scheduler := newTaskScheduler(ctx, mt, workerManager, nil, nil, handler, nil, nil)
// remove task in meta
err := scheduler.meta.analyzeMeta.DropAnalyzeTask(context.TODO(), 1)
s.NoError(err)
err = scheduler.meta.analyzeMeta.DropAnalyzeTask(context.TODO(), 2)
s.NoError(err)
mt.segments.DropSegment(1000)
scheduler.scheduleDuration = s.duration
scheduler.Start()
// taskID 5 state retry, drop task on worker --> state: Init
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// update version failed --> state: init
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(errors.New("catalog update version error")).Once()
// assign task to indexNode fail --> state: retry
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Once()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(&commonpb.Status{
Code: 65535,
Retriable: false,
Detail: "",
ExtraInfo: nil,
Reason: "mock error",
}, nil).Once()
// drop task failed --> state: retry
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Status(errors.New("drop job failed")), nil).Once()
// retry --> state: init
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// update state to building failed --> state: retry
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Once()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(errors.New("catalog update building state error")).Once()
// retry --> state: init
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// assign success --> state: InProgress
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// query result InProgress --> state: InProgress
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, request *workerpb.QueryJobsV2Request, option ...grpc.CallOption) (*workerpb.QueryJobsV2Response, error) {
results := make([]*workerpb.AnalyzeResult, 0)
for _, taskID := range request.GetTaskIDs() {
results = append(results, &workerpb.AnalyzeResult{
TaskID: taskID,
State: indexpb.JobState_JobStateInProgress,
})
}
return &workerpb.QueryJobsV2Response{
Status: merr.Success(),
ClusterID: request.GetClusterID(),
Result: &workerpb.QueryJobsV2Response_AnalyzeJobResults{
AnalyzeJobResults: &workerpb.AnalyzeResults{
Results: results,
},
},
}, nil
}).Once()
// query result Retry --> state: retry
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, request *workerpb.QueryJobsV2Request, option ...grpc.CallOption) (*workerpb.QueryJobsV2Response, error) {
results := make([]*workerpb.AnalyzeResult, 0)
for _, taskID := range request.GetTaskIDs() {
results = append(results, &workerpb.AnalyzeResult{
TaskID: taskID,
State: indexpb.JobState_JobStateRetry,
FailReason: "node analyze data failed",
})
}
return &workerpb.QueryJobsV2Response{
Status: merr.Success(),
ClusterID: request.GetClusterID(),
Result: &workerpb.QueryJobsV2Response_AnalyzeJobResults{
AnalyzeJobResults: &workerpb.AnalyzeResults{
Results: results,
},
},
}, nil
}).Once()
// retry --> state: init
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// init --> state: InProgress
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// query result failed --> state: retry
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).Return(&workerpb.QueryJobsV2Response{
Status: merr.Status(errors.New("query job failed")),
}, nil).Once()
// retry --> state: init
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// init --> state: InProgress
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// query result not exists --> state: retry
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).Return(&workerpb.QueryJobsV2Response{
Status: merr.Success(),
ClusterID: "",
Result: &workerpb.QueryJobsV2Response_AnalyzeJobResults{},
}, nil).Once()
// retry --> state: init
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// init --> state: InProgress
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// node not exist --> state: retry
workerManager.EXPECT().GetClientByID(mock.Anything).Return(nil, false).Once()
// retry --> state: init
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// init --> state: InProgress
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// query result success --> state: finished
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, request *workerpb.QueryJobsV2Request, option ...grpc.CallOption) (*workerpb.QueryJobsV2Response, error) {
results := make([]*workerpb.AnalyzeResult, 0)
for _, taskID := range request.GetTaskIDs() {
results = append(results, &workerpb.AnalyzeResult{
TaskID: taskID,
State: indexpb.JobState_JobStateFinished,
//CentroidsFile: fmt.Sprintf("%d/stats_file", taskID),
//SegmentOffsetMappingFiles: map[int64]string{
// 1000: "1000/offset_mapping",
// 1001: "1001/offset_mapping",
// 1002: "1002/offset_mapping",
//},
FailReason: "",
})
}
return &workerpb.QueryJobsV2Response{
Status: merr.Success(),
ClusterID: request.GetClusterID(),
Result: &workerpb.QueryJobsV2Response_AnalyzeJobResults{
AnalyzeJobResults: &workerpb.AnalyzeResults{
Results: results,
},
},
}, nil
}).Once()
// set job info failed --> state: Finished
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(errors.New("set job info failed")).Once()
// set job success, drop job on task failed --> state: Finished
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Once()
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Status(errors.New("drop job failed")), nil).Once()
// drop job success --> no task
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Once()
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
for {
if scheduler.pendingTasks.TaskCount() == 0 {
taskNum := scheduler.runningTasks.Len()
if taskNum == 0 {
break
}
}
time.Sleep(time.Second)
}
scheduler.Stop()
})
}
func (s *taskSchedulerSuite) Test_indexTaskFailCase() {
s.Run("HNSW", func() {
ctx := context.Background()
indexNodeTasks := make(map[int64]int)
catalog := catalogmocks.NewDataCoordCatalog(s.T())
in := mocks.NewMockIndexNodeClient(s.T())
workerManager := session.NewMockWorkerManager(s.T())
workerManager.EXPECT().QuerySlots().RunAndReturn(func() map[int64]*session.WorkerSlots {
return map[int64]*session.WorkerSlots{
1: {
NodeID: 1,
TotalSlots: 16,
AvailableSlots: 16,
},
}
})
segIndexes := typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]]()
segIdx := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx.Insert(indexID, &model.SegmentIndex{
SegmentID: segID,
CollectionID: s.collectionID,
PartitionID: s.partitionID,
NumRows: 1025,
IndexID: indexID,
BuildID: buildID,
IndexState: commonpb.IndexState_Unissued,
})
segIndexes.Insert(segID, segIdx)
mt := createMeta(catalog,
withAnalyzeMeta(&analyzeMeta{
ctx: context.Background(),
catalog: catalog,
}),
withIndexMeta(&indexMeta{
fieldIndexLock: sync.RWMutex{},
ctx: ctx,
catalog: catalog,
keyLock: lock.NewKeyLock[UniqueID](),
indexes: map[UniqueID]map[UniqueID]*model.Index{
s.collectionID: {
indexID: {
CollectionID: s.collectionID,
FieldID: s.fieldID,
IndexID: indexID,
IndexName: indexName,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "HNSW",
},
{
Key: common.MetricTypeKey,
Value: "L2",
},
},
},
},
},
segmentBuildInfo: newSegmentIndexBuildInfo(),
segmentIndexes: segIndexes,
}),
withStatsTaskMeta(&statsTaskMeta{
ctx: context.Background(),
catalog: catalog,
keyLock: lock.NewKeyLock[UniqueID](),
tasks: typeutil.NewConcurrentMap[UniqueID, *indexpb.StatsTask](),
segmentID2Tasks: typeutil.NewConcurrentMap[string, *indexpb.StatsTask](),
}))
mt.indexMeta.segmentBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID,
CollectionID: s.collectionID,
PartitionID: s.partitionID,
NumRows: 1025,
IndexID: indexID,
BuildID: buildID,
IndexState: commonpb.IndexState_Unissued,
})
cm := mocks.NewChunkManager(s.T())
cm.EXPECT().RootPath().Return("ut-index")
handler := NewNMockHandler(s.T())
scheduler := newTaskScheduler(ctx, mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil, nil)
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("True")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("False")
scheduler.Start()
// get collection info failed --> init
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, collID int64) (*collectionInfo, error) {
log.Debug("get collection info failed", zap.Int64("collectionID", collID))
return nil, errors.New("mock error")
}).Once()
// get collection info success, get dim failed --> init
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, i int64) (*collectionInfo, error) {
log.Debug("get collection info success", zap.Int64("collectionID", i))
return &collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{FieldID: 100, Name: "pk", IsPrimaryKey: true, IsPartitionKey: true, DataType: schemapb.DataType_Int64},
{FieldID: s.fieldID, Name: "vec"},
},
},
}, nil
}).Once()
// assign failed --> retry
workerManager.EXPECT().GetClientByID(mock.Anything).RunAndReturn(func(nodeID int64) (types.IndexNodeClient, bool) {
log.Debug("get client success, but assign failed", zap.Int64("nodeID", nodeID))
return in, true
}).Once()
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, indices []*model.SegmentIndex) error {
log.Debug("alter segment indexes success, but assign failed", zap.Int64("taskID", indices[0].BuildID))
return nil
}).Once()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *workerpb.CreateJobV2Request, option ...grpc.CallOption) (*commonpb.Status, error) {
indexNodeTasks[request.GetTaskID()]++
log.Debug("assign task failed", zap.Int64("taskID", request.GetTaskID()))
return nil, errors.New("mock error")
}).Once()
// retry --> init
workerManager.EXPECT().GetClientByID(mock.Anything).RunAndReturn(func(nodeID int64) (types.IndexNodeClient, bool) {
log.Debug("assign failed, drop task on worker", zap.Int64("nodeID", nodeID))
return in, true
}).Once()
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *workerpb.DropJobsV2Request, option ...grpc.CallOption) (*commonpb.Status, error) {
for _, taskID := range request.GetTaskIDs() {
indexNodeTasks[taskID]--
}
log.Debug("drop task on worker, success", zap.Int64s("taskIDs", request.GetTaskIDs()))
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}).Once()
// init --> inProgress
workerManager.EXPECT().GetClientByID(mock.Anything).RunAndReturn(func(nodeID int64) (types.IndexNodeClient, bool) {
log.Debug("assign task success", zap.Int64("nodeID", nodeID))
return in, true
}).Once()
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, indices []*model.SegmentIndex) error {
log.Debug("alter segment success twice and assign task success", zap.Int64("taskID", indices[0].BuildID))
return nil
}).Twice()
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, i int64) (*collectionInfo, error) {
log.Debug("get collection success and assign task success", zap.Int64("collID", i))
return &collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{FieldID: 100, Name: "pk", IsPrimaryKey: true, IsPartitionKey: true, DataType: schemapb.DataType_Int64},
{FieldID: s.fieldID, Name: "vec", TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "10"}}},
},
},
}, nil
}).Once()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *workerpb.CreateJobV2Request, option ...grpc.CallOption) (*commonpb.Status, error) {
indexNodeTasks[request.GetTaskID()]++
log.Debug("assign task success", zap.Int64("nodeID", nodeID))
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}).Once()
// inProgress --> Finished
workerManager.EXPECT().GetClientByID(mock.Anything).RunAndReturn(func(nodeID int64) (types.IndexNodeClient, bool) {
log.Debug("get task result success, task is finished", zap.Int64("nodeID", nodeID))
return in, true
}).Once()
in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *workerpb.QueryJobsV2Request, option ...grpc.CallOption) (*workerpb.QueryJobsV2Response, error) {
log.Debug("query task result success, task is finished", zap.Int64s("taskIDs", request.GetTaskIDs()))
return &workerpb.QueryJobsV2Response{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
ClusterID: "",
Result: &workerpb.QueryJobsV2Response_IndexJobResults{
IndexJobResults: &workerpb.IndexJobResults{
Results: []*workerpb.IndexTaskInfo{
{
BuildID: buildID,
State: commonpb.IndexState_Finished,
IndexFileKeys: []string{"file1", "file2"},
SerializedSize: 1024,
},
},
},
},
}, nil
})
// finished --> done
finishCH := make(chan struct{})
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, indices []*model.SegmentIndex) error {
log.Debug("task is finished, alter segment index success", zap.Int64("taskID", indices[0].BuildID))
return nil
}).Once()
workerManager.EXPECT().GetClientByID(mock.Anything).RunAndReturn(func(nodeID int64) (types.IndexNodeClient, bool) {
log.Debug("task is finished, drop task on worker", zap.Int64("nodeID", nodeID))
return in, true
}).Once()
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *workerpb.DropJobsV2Request, option ...grpc.CallOption) (*commonpb.Status, error) {
for _, taskID := range request.GetTaskIDs() {
indexNodeTasks[taskID]--
finishCH <- struct{}{}
}
log.Debug("task is finished, drop task on worker success", zap.Int64("nodeID", nodeID))
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}).Once()
<-finishCH
for {
if scheduler.pendingTasks.TaskCount() == 0 {
taskNum := scheduler.runningTasks.Len()
if taskNum == 0 {
break
}
}
time.Sleep(time.Second)
}
scheduler.Stop()
indexJob, exist := mt.indexMeta.GetIndexJob(buildID)
s.True(exist)
s.Equal(commonpb.IndexState_Finished, indexJob.IndexState)
for _, v := range indexNodeTasks {
s.Zero(v)
}
})
}
func Test_taskSchedulerSuite(t *testing.T) {
suite.Run(t, new(taskSchedulerSuite))
}
func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
ctx := context.Background()
catalog := catalogmocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil)
in := mocks.NewMockIndexNodeClient(s.T())
workerManager := session.NewMockWorkerManager(s.T())
workerManager.EXPECT().QuerySlots().RunAndReturn(func() map[int64]*session.WorkerSlots {
return map[int64]*session.WorkerSlots{
1: {
NodeID: 1,
TotalSlots: 16,
AvailableSlots: 16,
},
}
})
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true)
minNumberOfRowsToBuild := paramtable.Get().DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() + 1
fieldsSchema := []*schemapb.FieldSchema{
{
FieldID: fieldID,
Name: "vec",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: common.MetricTypeKey,
Value: "L2",
},
{
Key: common.IndexTypeKey,
Value: "HNSW",
},
},
},
{
FieldID: partitionKeyID,
Name: "scalar",
DataType: schemapb.DataType_VarChar,
IsPartitionKey: true,
},
}
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(collID, &collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Fields: fieldsSchema,
},
CreatedAt: 0,
})
segIndexes := typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]]()
segIdx := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx.Insert(indexID, &model.SegmentIndex{
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: minNumberOfRowsToBuild,
IndexID: indexID,
BuildID: buildID,
NodeID: 0,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 0,
IndexFileKeys: nil,
IndexSerializedSize: 0,
})
segIndexes.Insert(segID, segIdx)
mt := meta{
catalog: catalog,
collections: collections,
analyzeMeta: &analyzeMeta{
ctx: context.Background(),
catalog: catalog,
},
indexMeta: &indexMeta{
catalog: catalog,
keyLock: lock.NewKeyLock[UniqueID](),
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 1,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: common.MetricTypeKey,
Value: "L2",
},
{
Key: common.IndexTypeKey,
Value: "HNSW",
},
},
},
},
},
segmentIndexes: segIndexes,
segmentBuildInfo: newSegmentIndexBuildInfo(),
},
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: minNumberOfRowsToBuild,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
},
},
statsTaskMeta: &statsTaskMeta{
ctx: context.Background(),
catalog: catalog,
keyLock: lock.NewKeyLock[UniqueID](),
tasks: typeutil.NewConcurrentMap[UniqueID, *indexpb.StatsTask](),
segmentID2Tasks: typeutil.NewConcurrentMap[string, *indexpb.StatsTask](),
},
}
mt.indexMeta.segmentBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: minNumberOfRowsToBuild,
IndexID: indexID,
BuildID: buildID,
NodeID: 0,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 0,
IndexFileKeys: nil,
IndexSerializedSize: 0,
})
cm := mocks.NewChunkManager(s.T())
cm.EXPECT().RootPath().Return("ut-index")
handler := NewNMockHandler(s.T())
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Name: "coll",
Fields: fieldsSchema,
EnableDynamicField: false,
Properties: nil,
},
}, nil)
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
scheduler := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil, nil)
waitTaskDoneFunc := func(sche *taskScheduler) {
for {
time.Sleep(time.Second * 3)
if sche.pendingTasks.TaskCount() == 0 {
taskNum := scheduler.runningTasks.Len()
if taskNum == 0 {
break
}
}
}
}
resetMetaFunc := func() {
mt.indexMeta.keyLock.Lock(buildID)
t, ok := mt.indexMeta.segmentBuildInfo.Get(buildID)
s.True(ok)
t.IndexState = commonpb.IndexState_Unissued
mt.indexMeta.segmentBuildInfo.Add(t)
segIdxes, ok := mt.indexMeta.segmentIndexes.Get(segID)
s.True(ok)
t, ok = segIdxes.Get(indexID)
s.True(ok)
t.IndexState = commonpb.IndexState_Unissued
segIdxes.Insert(indexID, t)
mt.indexMeta.segmentIndexes.Insert(segID, segIdxes)
mt.indexMeta.keyLock.Unlock(buildID)
mt.indexMeta.fieldIndexLock.Lock()
defer mt.indexMeta.fieldIndexLock.Unlock()
mt.indexMeta.indexes[collID][indexID].IndexParams[1].Value = "HNSW"
coll, ok := mt.collections.Get(collID)
s.True(ok)
coll.Schema.Fields[0].DataType = schemapb.DataType_FloatVector
coll.Schema.Fields[1].IsPartitionKey = true
coll.Schema.Fields[1].DataType = schemapb.DataType_VarChar
}
in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, request *workerpb.QueryJobsV2Request, option ...grpc.CallOption) (*workerpb.QueryJobsV2Response, error) {
switch request.GetJobType() {
case indexpb.JobType_JobTypeIndexJob:
results := make([]*workerpb.IndexTaskInfo, 0)
for _, buildID := range request.GetTaskIDs() {
results = append(results, &workerpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Finished,
IndexFileKeys: []string{"file1", "file2"},
SerializedSize: 1024,
FailReason: "",
CurrentIndexVersion: 0,
IndexStoreVersion: 0,
})
}
return &workerpb.QueryJobsV2Response{
Status: merr.Success(),
ClusterID: request.GetClusterID(),
Result: &workerpb.QueryJobsV2Response_IndexJobResults{
IndexJobResults: &workerpb.IndexJobResults{
Results: results,
},
},
}, nil
default:
return &workerpb.QueryJobsV2Response{
Status: merr.Status(errors.New("unknown job type")),
}, nil
}
})
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil)
s.Run("success to get opt field on startup", func() {
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.NotZero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set")
return merr.Success(), nil
}).Once()
s.Equal(1, scheduler.pendingTasks.TaskCount())
s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(buildID).GetState())
scheduler.Start()
waitTaskDoneFunc(scheduler)
resetMetaFunc()
})
s.Run("Submit valid", func() {
for _, dataType := range []schemapb.DataType{
schemapb.DataType_Int8,
schemapb.DataType_Int16,
schemapb.DataType_Int32,
schemapb.DataType_Int64,
schemapb.DataType_VarChar,
schemapb.DataType_String,
} {
coll, ok := mt.collections.Get(collID)
s.True(ok)
coll.Schema.Fields[1].DataType = dataType
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.NotZero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set")
return merr.Success(), nil
}).Once()
t := &indexBuildTask{
taskID: buildID,
nodeID: nodeID,
taskInfo: &workerpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Unissued,
FailReason: "",
},
}
scheduler.enqueue(t)
waitTaskDoneFunc(scheduler)
resetMetaFunc()
}
})
// should still be able to build vec index when opt field is not set
s.Run("Submit returns empty optional field when cfg disable", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set")
return merr.Success(), nil
}).Once()
t := &indexBuildTask{
taskID: buildID,
nodeID: nodeID,
taskInfo: &workerpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Unissued,
FailReason: "",
},
}
scheduler.enqueue(t)
waitTaskDoneFunc(scheduler)
resetMetaFunc()
})
s.Run("Submit returns empty when vector type is not dense vector", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
for _, dataType := range []schemapb.DataType{
schemapb.DataType_SparseFloatVector,
} {
coll, ok := mt.collections.Get(collID)
s.True(ok)
coll.Schema.Fields[0].DataType = dataType
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should not be set")
return merr.Success(), nil
}).Once()
t := &indexBuildTask{
taskID: buildID,
nodeID: nodeID,
taskInfo: &workerpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Unissued,
FailReason: "",
},
}
scheduler.enqueue(t)
waitTaskDoneFunc(scheduler)
resetMetaFunc()
}
})
s.Run("Submit returns empty optional field when the data type is not STRING or VARCHAR or Integer", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
for _, dataType := range []schemapb.DataType{
schemapb.DataType_Bool,
schemapb.DataType_Float,
schemapb.DataType_Double,
schemapb.DataType_Array,
schemapb.DataType_JSON,
} {
coll, ok := mt.collections.Get(collID)
s.True(ok)
coll.Schema.Fields[1].DataType = dataType
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set")
return merr.Success(), nil
}).Once()
t := &indexBuildTask{
taskID: buildID,
nodeID: nodeID,
taskInfo: &workerpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Unissued,
FailReason: "",
},
}
scheduler.enqueue(t)
waitTaskDoneFunc(scheduler)
resetMetaFunc()
}
})
s.Run("Submit returns empty optional field when no partition key", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
coll, ok := mt.collections.Get(collID)
s.True(ok)
coll.Schema.Fields[1].IsPartitionKey = false
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set")
return merr.Success(), nil
}).Once()
t := &indexBuildTask{
taskID: buildID,
nodeID: nodeID,
taskInfo: &workerpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Unissued,
FailReason: "",
},
}
scheduler.enqueue(t)
waitTaskDoneFunc(scheduler)
resetMetaFunc()
})
s.Run("Submit partitionKeyIsolation is false when schema is not set", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Equal(in.GetIndexRequest().PartitionKeyIsolation, false)
return merr.Success(), nil
}).Once()
t := &indexBuildTask{
taskID: buildID,
nodeID: nodeID,
taskInfo: &workerpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Unissued,
FailReason: "",
},
}
scheduler.enqueue(t)
waitTaskDoneFunc(scheduler)
resetMetaFunc()
})
scheduler.Stop()
isoCollInfo := &collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Name: "coll",
Fields: fieldsSchema,
EnableDynamicField: false,
},
Properties: map[string]string{
common.PartitionKeyIsolationKey: "false",
},
}
handler_isolation := NewNMockHandler(s.T())
handler_isolation.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(isoCollInfo, nil)
scheduler_isolation := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler_isolation, nil, nil)
scheduler_isolation.Start()
s.Run("Submit partitionKeyIsolation is false when MV not enabled", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Equal(in.GetIndexRequest().PartitionKeyIsolation, false)
return merr.Success(), nil
}).Once()
t := &indexBuildTask{
taskID: buildID,
nodeID: nodeID,
taskInfo: &workerpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Unissued,
FailReason: "",
},
}
scheduler_isolation.enqueue(t)
waitTaskDoneFunc(scheduler_isolation)
resetMetaFunc()
})
s.Run("Submit partitionKeyIsolation is true when MV enabled", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
isoCollInfo.Properties[common.PartitionKeyIsolationKey] = "true"
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.True(in.GetIndexRequest().PartitionKeyIsolation)
return merr.Success(), nil
}).Once()
t := &indexBuildTask{
taskID: buildID,
nodeID: nodeID,
taskInfo: &workerpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Unissued,
FailReason: "",
},
}
scheduler_isolation.enqueue(t)
waitTaskDoneFunc(scheduler_isolation)
resetMetaFunc()
})
s.Run("Submit partitionKeyIsolation is invalid when MV is enabled", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
isoCollInfo.Properties[common.PartitionKeyIsolationKey] = "invalid"
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Equal(in.GetIndexRequest().PartitionKeyIsolation, false)
return merr.Success(), nil
}).Once()
t := &indexBuildTask{
taskID: buildID,
nodeID: nodeID,
taskInfo: &workerpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Unissued,
FailReason: "",
},
}
scheduler_isolation.enqueue(t)
waitTaskDoneFunc(scheduler_isolation)
resetMetaFunc()
})
scheduler_isolation.Stop()
}
func (s *taskSchedulerSuite) Test_reload() {
s.Run("normal case", func() {
catalog := catalogmocks.NewDataCoordCatalog(s.T())
workerManager := session.NewMockWorkerManager(s.T())
handler := NewNMockHandler(s.T())
tasks := typeutil.NewConcurrentMap[UniqueID, *indexpb.StatsTask]()
statsTask := &indexpb.StatsTask{
CollectionID: 10000,
PartitionID: 10001,
SegmentID: 1000,
InsertChannel: "",
TaskID: statsTaskID,
Version: 1,
NodeID: 1,
State: indexpb.JobState_JobStateInProgress,
FailReason: "",
TargetSegmentID: 2000,
SubJobType: indexpb.StatsSubJob_Sort,
CanRecycle: false,
}
tasks.Insert(statsTaskID, statsTask)
secondaryIndex := typeutil.NewConcurrentMap[string, *indexpb.StatsTask]()
secondaryKey := createSecondaryIndexKey(statsTask.GetSegmentID(), statsTask.GetSubJobType().String())
secondaryIndex.Insert(secondaryKey, statsTask)
mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)),
withStatsTaskMeta(&statsTaskMeta{
ctx: context.Background(),
catalog: catalog,
tasks: tasks,
segmentID2Tasks: secondaryIndex,
}))
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(true).Maybe()
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler)
s.NotNil(scheduler)
s.True(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.runningTasks.Get(statsTaskID)
s.True(ok)
s.NotNil(task)
})
s.Run("segment is compacting", func() {
catalog := catalogmocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(nil)
workerManager := session.NewMockWorkerManager(s.T())
handler := NewNMockHandler(s.T())
tasks := typeutil.NewConcurrentMap[UniqueID, *indexpb.StatsTask]()
statsTask := &indexpb.StatsTask{
CollectionID: 10000,
PartitionID: 10001,
SegmentID: 1000,
InsertChannel: "",
TaskID: statsTaskID,
Version: 1,
NodeID: 1,
State: indexpb.JobState_JobStateInProgress,
FailReason: "",
TargetSegmentID: 2000,
SubJobType: indexpb.StatsSubJob_Sort,
CanRecycle: false,
}
tasks.Insert(statsTaskID, statsTask)
secondaryIndex := typeutil.NewConcurrentMap[string, *indexpb.StatsTask]()
secondaryKey := createSecondaryIndexKey(statsTask.GetSegmentID(), statsTask.GetSubJobType().String())
secondaryIndex.Insert(secondaryKey, statsTask)
mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)),
withStatsTaskMeta(&statsTaskMeta{
ctx: context.Background(),
catalog: catalog,
tasks: tasks,
segmentID2Tasks: secondaryIndex,
keyLock: lock.NewKeyLock[UniqueID](),
}))
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(true).Maybe()
mt.segments.segments[1000].isCompacting = true
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler)
s.NotNil(scheduler)
s.True(mt.segments.segments[1000].isCompacting)
task := scheduler.pendingTasks.Get(statsTaskID)
s.Nil(task)
})
s.Run("drop task failed", func() {
catalog := catalogmocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(errors.New("mock error"))
workerManager := session.NewMockWorkerManager(s.T())
handler := NewNMockHandler(s.T())
tasks := typeutil.NewConcurrentMap[UniqueID, *indexpb.StatsTask]()
statsTask := &indexpb.StatsTask{
CollectionID: 10000,
PartitionID: 10001,
SegmentID: 1000,
InsertChannel: "",
TaskID: statsTaskID,
Version: 1,
NodeID: 1,
State: indexpb.JobState_JobStateInProgress,
FailReason: "",
TargetSegmentID: 2000,
SubJobType: indexpb.StatsSubJob_Sort,
CanRecycle: false,
}
tasks.Insert(statsTaskID, statsTask)
secondaryIndex := typeutil.NewConcurrentMap[string, *indexpb.StatsTask]()
secondaryKey := createSecondaryIndexKey(statsTask.GetSegmentID(), statsTask.GetSubJobType().String())
secondaryIndex.Insert(secondaryKey, statsTask)
mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)),
withStatsTaskMeta(&statsTaskMeta{
ctx: context.Background(),
catalog: catalog,
tasks: tasks,
segmentID2Tasks: secondaryIndex,
keyLock: lock.NewKeyLock[UniqueID](),
}))
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(true).Maybe()
mt.segments.segments[1000].isCompacting = true
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler)
s.NotNil(scheduler)
s.True(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.runningTasks.Get(statsTaskID)
s.True(ok)
s.Equal(indexpb.JobState_JobStateFailed, task.GetState())
})
s.Run("segment is in l0 compaction", func() {
catalog := catalogmocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(nil)
workerManager := session.NewMockWorkerManager(s.T())
handler := NewNMockHandler(s.T())
tasks := typeutil.NewConcurrentMap[UniqueID, *indexpb.StatsTask]()
statsTask := &indexpb.StatsTask{
CollectionID: 10000,
PartitionID: 10001,
SegmentID: 1000,
InsertChannel: "",
TaskID: statsTaskID,
Version: 1,
NodeID: 1,
State: indexpb.JobState_JobStateInProgress,
FailReason: "",
TargetSegmentID: 2000,
SubJobType: indexpb.StatsSubJob_Sort,
CanRecycle: false,
}
tasks.Insert(statsTaskID, statsTask)
secondaryIndex := typeutil.NewConcurrentMap[string, *indexpb.StatsTask]()
secondaryKey := createSecondaryIndexKey(statsTask.GetSegmentID(), statsTask.GetSubJobType().String())
secondaryIndex.Insert(secondaryKey, statsTask)
mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)),
withStatsTaskMeta(&statsTaskMeta{
ctx: context.Background(),
catalog: catalog,
tasks: tasks,
segmentID2Tasks: secondaryIndex,
keyLock: lock.NewKeyLock[UniqueID](),
}))
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(false).Maybe()
mt.segments.segments[1000].isCompacting = false
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler)
s.NotNil(scheduler)
s.False(mt.segments.segments[1000].isCompacting)
task := scheduler.pendingTasks.Get(statsTaskID)
s.Nil(task)
})
s.Run("drop task failed", func() {
catalog := catalogmocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(errors.New("mock error"))
workerManager := session.NewMockWorkerManager(s.T())
handler := NewNMockHandler(s.T())
tasks := typeutil.NewConcurrentMap[UniqueID, *indexpb.StatsTask]()
statsTask := &indexpb.StatsTask{
CollectionID: 10000,
PartitionID: 10001,
SegmentID: 1000,
InsertChannel: "",
TaskID: statsTaskID,
Version: 1,
NodeID: 1,
State: indexpb.JobState_JobStateInProgress,
FailReason: "",
TargetSegmentID: 2000,
SubJobType: indexpb.StatsSubJob_Sort,
CanRecycle: false,
}
tasks.Insert(statsTaskID, statsTask)
secondaryIndex := typeutil.NewConcurrentMap[string, *indexpb.StatsTask]()
secondaryKey := createSecondaryIndexKey(statsTask.GetSegmentID(), statsTask.GetSubJobType().String())
secondaryIndex.Insert(secondaryKey, statsTask)
mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)),
withStatsTaskMeta(&statsTaskMeta{
ctx: context.Background(),
catalog: catalog,
tasks: tasks,
segmentID2Tasks: secondaryIndex,
keyLock: lock.NewKeyLock[UniqueID](),
}))
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(false).Maybe()
mt.segments.segments[1000].isCompacting = false
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler)
s.NotNil(scheduler)
s.False(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.runningTasks.Get(statsTaskID)
s.True(ok)
s.Equal(indexpb.JobState_JobStateFailed, task.GetState())
})
}
func (s *taskSchedulerSuite) Test_zeroSegmentStats() {
ctx := context.Background()
catalog := catalogmocks.NewDataCoordCatalog(s.T())
taskID := UniqueID(111)
segID := UniqueID(112)
targetSegID := UniqueID(113)
workerManager := session.NewMockWorkerManager(s.T())
workerManager.EXPECT().QuerySlots().RunAndReturn(func() map[int64]*session.WorkerSlots {
return map[int64]*session.WorkerSlots{
1: {
NodeID: 1,
TotalSlots: 16,
AvailableSlots: 16,
},
}
})
mt := &meta{
ctx: ctx,
catalog: catalog,
segments: NewSegmentsInfo(),
statsTaskMeta: &statsTaskMeta{
ctx: ctx,
catalog: catalog,
tasks: typeutil.NewConcurrentMap[UniqueID, *indexpb.StatsTask](),
segmentID2Tasks: typeutil.NewConcurrentMap[string, *indexpb.StatsTask](),
keyLock: lock.NewKeyLock[UniqueID](),
},
}
mt.statsTaskMeta.tasks.Insert(taskID, &indexpb.StatsTask{
CollectionID: 1,
PartitionID: 2,
SegmentID: segID,
InsertChannel: "ch-1",
TaskID: taskID,
Version: 0,
NodeID: 0,
State: indexpb.JobState_JobStateInit,
FailReason: "",
TargetSegmentID: targetSegID,
SubJobType: indexpb.StatsSubJob_Sort,
CanRecycle: false,
})
catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil)
err := mt.AddSegment(ctx, &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "ch-1",
State: commonpb.SegmentState_Flushed,
NumOfRows: 0,
},
})
s.NoError(err)
cm := mocks.NewChunkManager(s.T())
catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(nil)
in := mocks.NewMockIndexNodeClient(s.T())
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil)
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true)
handler := NewNMockHandler(s.T())
ctx, cancel := context.WithCancel(ctx)
scheduler := &taskScheduler{
ctx: ctx,
cancel: cancel,
meta: mt,
pendingTasks: newPriorityQueuePolicy(),
runningTasks: typeutil.NewConcurrentMap[UniqueID, Task](),
notifyChan: make(chan struct{}, 1),
taskLock: lock.NewKeyLock[int64](),
scheduleDuration: Params.DataCoordCfg.IndexTaskSchedulerInterval.GetAsDuration(time.Millisecond),
collectMetricsDuration: time.Minute,
policy: defaultBuildIndexPolicy,
nodeManager: workerManager,
chunkManager: cm,
handler: handler,
indexEngineVersionManager: newIndexEngineVersionManager(),
allocator: nil,
taskStats: expirable.NewLRU[UniqueID, Task](512, nil, time.Minute*15),
compactionHandler: nil,
}
scheduler.Start()
scheduler.enqueue(newStatsTask(taskID, segID, targetSegID, indexpb.StatsSubJob_Sort, 1))
for {
time.Sleep(time.Second)
if scheduler.pendingTasks.TaskCount() == 0 {
taskNum := scheduler.runningTasks.Len()
if taskNum == 0 {
break
}
}
}
scheduler.Stop()
segment := mt.GetSegment(ctx, targetSegID)
s.Equal(int64(0), segment.NumOfRows)
s.Equal(commonpb.SegmentState_Dropped, segment.State)
}