mirror of https://github.com/milvus-io/milvus.git
enhance: improve ut cov of clustering compaction task (#35242)
#34792 Signed-off-by: wayblink <anyang.wang@zilliz.com>pull/35343/head
parent
bb15ecdc13
commit
c6253f9c9b
|
@ -290,10 +290,15 @@ func (t *clusteringCompactionTask) processMetaSaved() error {
|
||||||
func (t *clusteringCompactionTask) processIndexing() error {
|
func (t *clusteringCompactionTask) processIndexing() error {
|
||||||
// wait for segment indexed
|
// wait for segment indexed
|
||||||
collectionIndexes := t.meta.GetIndexMeta().GetIndexesForCollection(t.GetCollectionID(), "")
|
collectionIndexes := t.meta.GetIndexMeta().GetIndexesForCollection(t.GetCollectionID(), "")
|
||||||
|
if len(collectionIndexes) == 0 {
|
||||||
|
log.Debug("the collection has no index, no need to do indexing")
|
||||||
|
return t.completeTask()
|
||||||
|
}
|
||||||
indexed := func() bool {
|
indexed := func() bool {
|
||||||
for _, collectionIndex := range collectionIndexes {
|
for _, collectionIndex := range collectionIndexes {
|
||||||
for _, segmentID := range t.ResultSegments {
|
for _, segmentID := range t.ResultSegments {
|
||||||
segmentIndexState := t.meta.GetIndexMeta().GetSegmentIndexState(t.GetCollectionID(), segmentID, collectionIndex.IndexID)
|
segmentIndexState := t.meta.GetIndexMeta().GetSegmentIndexState(t.GetCollectionID(), segmentID, collectionIndex.IndexID)
|
||||||
|
log.Debug("segment index state", zap.String("segment", segmentIndexState.String()))
|
||||||
if segmentIndexState.GetState() != commonpb.IndexState_Finished {
|
if segmentIndexState.GetState() != commonpb.IndexState_Finished {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -303,7 +308,7 @@ func (t *clusteringCompactionTask) processIndexing() error {
|
||||||
}()
|
}()
|
||||||
log.Debug("check compaction result segments index states", zap.Bool("indexed", indexed), zap.Int64("planID", t.GetPlanID()), zap.Int64s("segments", t.ResultSegments))
|
log.Debug("check compaction result segments index states", zap.Bool("indexed", indexed), zap.Int64("planID", t.GetPlanID()), zap.Int64s("segments", t.ResultSegments))
|
||||||
if indexed {
|
if indexed {
|
||||||
t.completeTask()
|
return t.completeTask()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,9 @@ import (
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
|
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
|
||||||
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||||
"github.com/milvus-io/milvus/internal/storage"
|
"github.com/milvus-io/milvus/internal/storage"
|
||||||
"github.com/milvus-io/milvus/pkg/common"
|
"github.com/milvus-io/milvus/pkg/common"
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
|
@ -42,18 +44,20 @@ func TestClusteringCompactionTaskSuite(t *testing.T) {
|
||||||
type ClusteringCompactionTaskSuite struct {
|
type ClusteringCompactionTaskSuite struct {
|
||||||
suite.Suite
|
suite.Suite
|
||||||
|
|
||||||
mockID atomic.Int64
|
mockID atomic.Int64
|
||||||
mockAlloc *NMockAllocator
|
mockAlloc *NMockAllocator
|
||||||
meta *meta
|
meta *meta
|
||||||
mockSessMgr *MockSessionManager
|
mockSessMgr *MockSessionManager
|
||||||
handler *NMockHandler
|
handler *NMockHandler
|
||||||
session *MockSessionManager
|
session *MockSessionManager
|
||||||
|
analyzeScheduler *taskScheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ClusteringCompactionTaskSuite) SetupTest() {
|
func (s *ClusteringCompactionTaskSuite) SetupTest() {
|
||||||
|
ctx := context.Background()
|
||||||
cm := storage.NewLocalChunkManager(storage.RootPath(""))
|
cm := storage.NewLocalChunkManager(storage.RootPath(""))
|
||||||
catalog := datacoord.NewCatalog(NewMetaMemoryKV(), "", "")
|
catalog := datacoord.NewCatalog(NewMetaMemoryKV(), "", "")
|
||||||
meta, err := newMeta(context.TODO(), catalog, cm)
|
meta, err := newMeta(ctx, catalog, cm)
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
s.meta = meta
|
s.meta = meta
|
||||||
|
|
||||||
|
@ -75,6 +79,9 @@ func (s *ClusteringCompactionTaskSuite) SetupTest() {
|
||||||
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil).Maybe()
|
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil).Maybe()
|
||||||
|
|
||||||
s.session = NewMockSessionManager(s.T())
|
s.session = NewMockSessionManager(s.T())
|
||||||
|
|
||||||
|
scheduler := newTaskScheduler(ctx, s.meta, nil, cm, newIndexEngineVersionManager(), nil)
|
||||||
|
s.analyzeScheduler = scheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ClusteringCompactionTaskSuite) SetupSubTest() {
|
func (s *ClusteringCompactionTaskSuite) SetupSubTest() {
|
||||||
|
@ -82,8 +89,6 @@ func (s *ClusteringCompactionTaskSuite) SetupSubTest() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChange() {
|
func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChange() {
|
||||||
channel := "Ch-1"
|
|
||||||
|
|
||||||
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
SegmentInfo: &datapb.SegmentInfo{
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
ID: 101,
|
ID: 101,
|
||||||
|
@ -99,40 +104,9 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
|
||||||
PartitionStatsVersion: 10000,
|
PartitionStatsVersion: 10000,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
session := NewSessionManagerImpl()
|
s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
|
|
||||||
schema := ConstructScalarClusteringSchema("TestClusteringCompactionTask", 32, true)
|
task := s.generateBasicTask(false)
|
||||||
pk := &schemapb.FieldSchema{
|
|
||||||
FieldID: 100,
|
|
||||||
Name: Int64Field,
|
|
||||||
IsPrimaryKey: true,
|
|
||||||
Description: "",
|
|
||||||
DataType: schemapb.DataType_Int64,
|
|
||||||
TypeParams: nil,
|
|
||||||
IndexParams: nil,
|
|
||||||
AutoID: true,
|
|
||||||
IsClusteringKey: true,
|
|
||||||
}
|
|
||||||
|
|
||||||
task := &clusteringCompactionTask{
|
|
||||||
CompactionTask: &datapb.CompactionTask{
|
|
||||||
PlanID: 1,
|
|
||||||
TriggerID: 19530,
|
|
||||||
CollectionID: 1,
|
|
||||||
PartitionID: 10,
|
|
||||||
Channel: channel,
|
|
||||||
Type: datapb.CompactionType_ClusteringCompaction,
|
|
||||||
NodeID: 1,
|
|
||||||
State: datapb.CompactionTaskState_pipelining,
|
|
||||||
Schema: schema,
|
|
||||||
ClusteringKeyField: pk,
|
|
||||||
InputSegments: []int64{101, 102},
|
|
||||||
ResultSegments: []int64{1000, 1100},
|
|
||||||
},
|
|
||||||
meta: s.meta,
|
|
||||||
sessions: session,
|
|
||||||
allocator: s.mockAlloc,
|
|
||||||
}
|
|
||||||
|
|
||||||
task.processPipelining()
|
task.processPipelining()
|
||||||
|
|
||||||
|
@ -179,41 +153,49 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
|
||||||
s.Equal(int64(0), seg42.PartitionStatsVersion)
|
s.Equal(int64(0), seg42.PartitionStatsVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ClusteringCompactionTaskSuite) generateBasicTask() *clusteringCompactionTask {
|
func (s *ClusteringCompactionTaskSuite) generateBasicTask(vectorClusteringKey bool) *clusteringCompactionTask {
|
||||||
schema := ConstructScalarClusteringSchema("TestClusteringCompactionTask", 32, true)
|
schema := ConstructClusteringSchema("TestClusteringCompactionTask", 32, true, vectorClusteringKey)
|
||||||
pk := &schemapb.FieldSchema{
|
var pk *schemapb.FieldSchema
|
||||||
FieldID: 100,
|
if vectorClusteringKey {
|
||||||
Name: Int64Field,
|
pk = &schemapb.FieldSchema{
|
||||||
IsPrimaryKey: true,
|
FieldID: 101,
|
||||||
DataType: schemapb.DataType_Int64,
|
Name: FloatVecField,
|
||||||
AutoID: true,
|
IsPrimaryKey: false,
|
||||||
IsClusteringKey: true,
|
DataType: schemapb.DataType_FloatVector,
|
||||||
|
IsClusteringKey: true,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pk = &schemapb.FieldSchema{
|
||||||
|
FieldID: 100,
|
||||||
|
Name: Int64Field,
|
||||||
|
IsPrimaryKey: true,
|
||||||
|
DataType: schemapb.DataType_Int64,
|
||||||
|
AutoID: true,
|
||||||
|
IsClusteringKey: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
task := &clusteringCompactionTask{
|
compactionTask := &datapb.CompactionTask{
|
||||||
CompactionTask: &datapb.CompactionTask{
|
PlanID: 1,
|
||||||
PlanID: 1,
|
TriggerID: 19530,
|
||||||
TriggerID: 19530,
|
CollectionID: 1,
|
||||||
CollectionID: 1,
|
PartitionID: 10,
|
||||||
PartitionID: 10,
|
Type: datapb.CompactionType_ClusteringCompaction,
|
||||||
Type: datapb.CompactionType_ClusteringCompaction,
|
NodeID: 1,
|
||||||
NodeID: 1,
|
State: datapb.CompactionTaskState_pipelining,
|
||||||
State: datapb.CompactionTaskState_pipelining,
|
Schema: schema,
|
||||||
Schema: schema,
|
ClusteringKeyField: pk,
|
||||||
ClusteringKeyField: pk,
|
InputSegments: []int64{101, 102},
|
||||||
InputSegments: []int64{101, 102},
|
ResultSegments: []int64{1000, 1100},
|
||||||
ResultSegments: []int64{1000, 1100},
|
|
||||||
},
|
|
||||||
meta: s.meta,
|
|
||||||
handler: s.handler,
|
|
||||||
sessions: s.session,
|
|
||||||
allocator: s.mockAlloc,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
task := newClusteringCompactionTask(compactionTask, s.mockAlloc, s.meta, s.session, s.handler, s.analyzeScheduler)
|
||||||
|
task.maxRetryTimes = 0
|
||||||
return task
|
return task
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() {
|
func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() {
|
||||||
task := s.generateBasicTask()
|
task := s.generateBasicTask(false)
|
||||||
task.maxRetryTimes = 3
|
task.maxRetryTimes = 3
|
||||||
// process pipelining fail
|
// process pipelining fail
|
||||||
s.Equal(false, task.Process())
|
s.Equal(false, task.Process())
|
||||||
|
@ -228,96 +210,242 @@ func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() {
|
||||||
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
|
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ClusteringCompactionTaskSuite) TestProcessStateChange() {
|
func (s *ClusteringCompactionTaskSuite) TestProcessPipelining() {
|
||||||
task := s.generateBasicTask()
|
s.Run("process pipelining fail, segment not found", func() {
|
||||||
|
task := s.generateBasicTask(false)
|
||||||
// process pipelining fail
|
task.State = datapb.CompactionTaskState_pipelining
|
||||||
s.Equal(false, task.Process())
|
s.Equal(false, task.Process())
|
||||||
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
|
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
|
||||||
|
|
||||||
// process pipelining succeed
|
|
||||||
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
|
||||||
SegmentInfo: &datapb.SegmentInfo{
|
|
||||||
ID: 101,
|
|
||||||
State: commonpb.SegmentState_Flushed,
|
|
||||||
Level: datapb.SegmentLevel_L1,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
|
||||||
SegmentInfo: &datapb.SegmentInfo{
|
|
||||||
ID: 102,
|
|
||||||
State: commonpb.SegmentState_Flushed,
|
|
||||||
Level: datapb.SegmentLevel_L2,
|
|
||||||
PartitionStatsVersion: 10000,
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
|
|
||||||
s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
s.Run("pipelining fail, no datanode slot", func() {
|
||||||
task.State = datapb.CompactionTaskState_pipelining
|
task := s.generateBasicTask(false)
|
||||||
s.Equal(false, task.Process())
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: 101,
|
||||||
// process executing
|
State: commonpb.SegmentState_Flushed,
|
||||||
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(nil, merr.WrapErrNodeNotFound(1)).Once()
|
Level: datapb.SegmentLevel_L1,
|
||||||
s.Equal(false, task.Process())
|
|
||||||
s.Equal(datapb.CompactionTaskState_pipelining, task.GetState())
|
|
||||||
|
|
||||||
// repipelining
|
|
||||||
s.Equal(false, task.Process())
|
|
||||||
s.Equal(datapb.CompactionTaskState_pipelining, task.GetState())
|
|
||||||
task.NodeID = 1
|
|
||||||
s.Equal(false, task.Process())
|
|
||||||
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
|
|
||||||
|
|
||||||
// process executing
|
|
||||||
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(nil, nil).Once()
|
|
||||||
s.Equal(false, task.Process())
|
|
||||||
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
|
|
||||||
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
|
|
||||||
State: datapb.CompactionTaskState_executing,
|
|
||||||
}, nil).Once()
|
|
||||||
s.Equal(false, task.Process())
|
|
||||||
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
|
|
||||||
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
|
|
||||||
State: datapb.CompactionTaskState_completed,
|
|
||||||
Segments: []*datapb.CompactionSegment{
|
|
||||||
{
|
|
||||||
SegmentID: 1000,
|
|
||||||
},
|
},
|
||||||
{
|
})
|
||||||
SegmentID: 1001,
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: 102,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Level: datapb.SegmentLevel_L2,
|
||||||
|
PartitionStatsVersion: 10000,
|
||||||
},
|
},
|
||||||
},
|
})
|
||||||
}, nil).Once()
|
s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(merr.WrapErrDataNodeSlotExhausted())
|
||||||
s.Equal(false, task.Process())
|
task.State = datapb.CompactionTaskState_pipelining
|
||||||
s.Equal(datapb.CompactionTaskState_indexing, task.GetState())
|
s.False(task.Process())
|
||||||
|
s.Equal(int64(NullNodeID), task.GetNodeID())
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("process succeed, scalar clustering key", func() {
|
||||||
|
task := s.generateBasicTask(false)
|
||||||
|
task.State = datapb.CompactionTaskState_pipelining
|
||||||
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: 101,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Level: datapb.SegmentLevel_L1,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: 102,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Level: datapb.SegmentLevel_L2,
|
||||||
|
PartitionStatsVersion: 10000,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
|
task.State = datapb.CompactionTaskState_pipelining
|
||||||
|
s.Equal(false, task.Process())
|
||||||
|
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("process succeed, vector clustering key", func() {
|
||||||
|
task := s.generateBasicTask(true)
|
||||||
|
task.State = datapb.CompactionTaskState_pipelining
|
||||||
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: 101,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Level: datapb.SegmentLevel_L1,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: 102,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Level: datapb.SegmentLevel_L2,
|
||||||
|
PartitionStatsVersion: 10000,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
task.State = datapb.CompactionTaskState_pipelining
|
||||||
|
s.Equal(false, task.Process())
|
||||||
|
s.Equal(datapb.CompactionTaskState_analyzing, task.GetState())
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
|
func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
|
||||||
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
s.Run("process executing, get compaction result fail", func() {
|
||||||
SegmentInfo: &datapb.SegmentInfo{
|
task := s.generateBasicTask(false)
|
||||||
ID: 101,
|
task.State = datapb.CompactionTaskState_executing
|
||||||
State: commonpb.SegmentState_Flushed,
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
Level: datapb.SegmentLevel_L1,
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
},
|
ID: 101,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Level: datapb.SegmentLevel_L1,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: 102,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Level: datapb.SegmentLevel_L2,
|
||||||
|
PartitionStatsVersion: 10000,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(nil, merr.WrapErrNodeNotFound(1)).Once()
|
||||||
|
s.Equal(false, task.Process())
|
||||||
|
s.Equal(datapb.CompactionTaskState_pipelining, task.GetState())
|
||||||
})
|
})
|
||||||
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
|
||||||
SegmentInfo: &datapb.SegmentInfo{
|
s.Run("process executing, compaction result not ready", func() {
|
||||||
ID: 102,
|
task := s.generateBasicTask(false)
|
||||||
State: commonpb.SegmentState_Flushed,
|
task.State = datapb.CompactionTaskState_executing
|
||||||
Level: datapb.SegmentLevel_L2,
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
PartitionStatsVersion: 10000,
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
},
|
ID: 101,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Level: datapb.SegmentLevel_L1,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: 102,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Level: datapb.SegmentLevel_L2,
|
||||||
|
PartitionStatsVersion: 10000,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(nil, nil).Once()
|
||||||
|
s.Equal(false, task.Process())
|
||||||
|
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
|
||||||
|
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
|
||||||
|
State: datapb.CompactionTaskState_executing,
|
||||||
|
}, nil).Once()
|
||||||
|
s.Equal(false, task.Process())
|
||||||
|
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("process executing, scalar clustering key, compaction result ready", func() {
|
||||||
|
task := s.generateBasicTask(false)
|
||||||
|
task.State = datapb.CompactionTaskState_executing
|
||||||
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: 101,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Level: datapb.SegmentLevel_L1,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: 102,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Level: datapb.SegmentLevel_L2,
|
||||||
|
PartitionStatsVersion: 10000,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
|
||||||
|
State: datapb.CompactionTaskState_completed,
|
||||||
|
Segments: []*datapb.CompactionSegment{
|
||||||
|
{
|
||||||
|
SegmentID: 1000,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
SegmentID: 1001,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, nil).Once()
|
||||||
|
s.Equal(false, task.Process())
|
||||||
|
s.Equal(datapb.CompactionTaskState_indexing, task.GetState())
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("process executing, compaction result ready", func() {
|
||||||
|
task := s.generateBasicTask(false)
|
||||||
|
task.State = datapb.CompactionTaskState_executing
|
||||||
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: 101,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Level: datapb.SegmentLevel_L1,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: 102,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Level: datapb.SegmentLevel_L2,
|
||||||
|
PartitionStatsVersion: 10000,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
|
||||||
|
State: datapb.CompactionTaskState_completed,
|
||||||
|
Segments: []*datapb.CompactionSegment{
|
||||||
|
{
|
||||||
|
SegmentID: 1000,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
SegmentID: 1001,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, nil).Once()
|
||||||
|
s.Equal(false, task.Process())
|
||||||
|
s.Equal(datapb.CompactionTaskState_indexing, task.GetState())
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("process executing, compaction result timeout", func() {
|
||||||
|
task := s.generateBasicTask(false)
|
||||||
|
task.State = datapb.CompactionTaskState_executing
|
||||||
|
task.StartTime = time.Now().Unix()
|
||||||
|
task.TimeoutInSeconds = 1
|
||||||
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: 101,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Level: datapb.SegmentLevel_L1,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: 102,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Level: datapb.SegmentLevel_L2,
|
||||||
|
PartitionStatsVersion: 10000,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
|
||||||
|
State: datapb.CompactionTaskState_executing,
|
||||||
|
Segments: []*datapb.CompactionSegment{
|
||||||
|
{
|
||||||
|
SegmentID: 1000,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
SegmentID: 1001,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, nil).Once()
|
||||||
|
time.Sleep(time.Second * 1)
|
||||||
|
s.Equal(true, task.Process())
|
||||||
|
s.Equal(datapb.CompactionTaskState_cleaned, task.GetState())
|
||||||
})
|
})
|
||||||
task := s.generateBasicTask()
|
|
||||||
s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(merr.WrapErrDataNodeSlotExhausted())
|
|
||||||
task.State = datapb.CompactionTaskState_pipelining
|
|
||||||
s.NoError(task.doCompact())
|
|
||||||
s.Equal(int64(NullNodeID), task.GetNodeID())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ClusteringCompactionTaskSuite) TestProcessExecutingState() {
|
func (s *ClusteringCompactionTaskSuite) TestProcessExecutingState() {
|
||||||
task := s.generateBasicTask()
|
task := s.generateBasicTask(false)
|
||||||
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
|
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
|
||||||
State: datapb.CompactionTaskState_failed,
|
State: datapb.CompactionTaskState_failed,
|
||||||
}, nil).Once()
|
}, nil).Once()
|
||||||
|
@ -325,7 +453,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecutingState() {
|
||||||
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
|
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
|
||||||
|
|
||||||
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
|
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
|
||||||
State: datapb.CompactionTaskState_indexing,
|
State: datapb.CompactionTaskState_failed,
|
||||||
}, nil).Once()
|
}, nil).Once()
|
||||||
s.NoError(task.processExecuting())
|
s.NoError(task.processExecuting())
|
||||||
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
|
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
|
||||||
|
@ -357,9 +485,138 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecutingState() {
|
||||||
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
|
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() {
|
||||||
|
s.Run("collection has no index", func() {
|
||||||
|
task := s.generateBasicTask(false)
|
||||||
|
task.State = datapb.CompactionTaskState_indexing
|
||||||
|
s.True(task.Process())
|
||||||
|
s.Equal(datapb.CompactionTaskState_completed, task.GetState())
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("collection has index, segment is not indexed", func() {
|
||||||
|
task := s.generateBasicTask(false)
|
||||||
|
task.State = datapb.CompactionTaskState_indexing
|
||||||
|
index := &model.Index{
|
||||||
|
CollectionID: 1,
|
||||||
|
IndexID: 3,
|
||||||
|
}
|
||||||
|
err := s.meta.indexMeta.CreateIndex(index)
|
||||||
|
s.NoError(err)
|
||||||
|
|
||||||
|
s.False(task.Process())
|
||||||
|
s.Equal(datapb.CompactionTaskState_indexing, task.GetState())
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("collection has index, segment indexed", func() {
|
||||||
|
task := s.generateBasicTask(false)
|
||||||
|
task.State = datapb.CompactionTaskState_indexing
|
||||||
|
index := &model.Index{
|
||||||
|
CollectionID: 1,
|
||||||
|
IndexID: 3,
|
||||||
|
}
|
||||||
|
err := s.meta.indexMeta.CreateIndex(index)
|
||||||
|
s.NoError(err)
|
||||||
|
|
||||||
|
s.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{
|
||||||
|
IndexID: 3,
|
||||||
|
SegmentID: 1000,
|
||||||
|
CollectionID: 1,
|
||||||
|
IndexState: commonpb.IndexState_Finished,
|
||||||
|
})
|
||||||
|
s.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{
|
||||||
|
IndexID: 3,
|
||||||
|
SegmentID: 1100,
|
||||||
|
CollectionID: 1,
|
||||||
|
IndexState: commonpb.IndexState_Finished,
|
||||||
|
})
|
||||||
|
|
||||||
|
s.True(task.Process())
|
||||||
|
s.Equal(datapb.CompactionTaskState_completed, task.GetState())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
|
||||||
|
s.Run("analyze task not found", func() {
|
||||||
|
task := s.generateBasicTask(false)
|
||||||
|
task.State = datapb.CompactionTaskState_analyzing
|
||||||
|
s.False(task.Process())
|
||||||
|
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("analyze task failed", func() {
|
||||||
|
task := s.generateBasicTask(false)
|
||||||
|
task.State = datapb.CompactionTaskState_analyzing
|
||||||
|
task.AnalyzeTaskID = 7
|
||||||
|
t := &indexpb.AnalyzeTask{
|
||||||
|
CollectionID: task.CollectionID,
|
||||||
|
PartitionID: task.PartitionID,
|
||||||
|
FieldID: task.ClusteringKeyField.FieldID,
|
||||||
|
SegmentIDs: task.InputSegments,
|
||||||
|
TaskID: 7,
|
||||||
|
State: indexpb.JobState_JobStateFailed,
|
||||||
|
}
|
||||||
|
s.meta.analyzeMeta.AddAnalyzeTask(t)
|
||||||
|
s.False(task.Process())
|
||||||
|
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("analyze task fake finish, vector not support", func() {
|
||||||
|
task := s.generateBasicTask(false)
|
||||||
|
task.State = datapb.CompactionTaskState_analyzing
|
||||||
|
task.AnalyzeTaskID = 7
|
||||||
|
t := &indexpb.AnalyzeTask{
|
||||||
|
CollectionID: task.CollectionID,
|
||||||
|
PartitionID: task.PartitionID,
|
||||||
|
FieldID: task.ClusteringKeyField.FieldID,
|
||||||
|
SegmentIDs: task.InputSegments,
|
||||||
|
TaskID: 7,
|
||||||
|
State: indexpb.JobState_JobStateFinished,
|
||||||
|
CentroidsFile: "",
|
||||||
|
}
|
||||||
|
s.meta.analyzeMeta.AddAnalyzeTask(t)
|
||||||
|
s.False(task.Process())
|
||||||
|
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("analyze task finished", func() {
|
||||||
|
task := s.generateBasicTask(false)
|
||||||
|
task.State = datapb.CompactionTaskState_analyzing
|
||||||
|
task.AnalyzeTaskID = 7
|
||||||
|
t := &indexpb.AnalyzeTask{
|
||||||
|
CollectionID: task.CollectionID,
|
||||||
|
PartitionID: task.PartitionID,
|
||||||
|
FieldID: task.ClusteringKeyField.FieldID,
|
||||||
|
SegmentIDs: task.InputSegments,
|
||||||
|
TaskID: 7,
|
||||||
|
State: indexpb.JobState_JobStateFinished,
|
||||||
|
CentroidsFile: "somewhere",
|
||||||
|
}
|
||||||
|
s.meta.analyzeMeta.AddAnalyzeTask(t)
|
||||||
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: 101,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Level: datapb.SegmentLevel_L1,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||||
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: 102,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Level: datapb.SegmentLevel_L2,
|
||||||
|
PartitionStatsVersion: 10000,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
|
|
||||||
|
s.False(task.Process())
|
||||||
|
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// fix: https://github.com/milvus-io/milvus/issues/35110
|
// fix: https://github.com/milvus-io/milvus/issues/35110
|
||||||
func (s *ClusteringCompactionTaskSuite) TestCompleteTask() {
|
func (s *ClusteringCompactionTaskSuite) TestCompleteTask() {
|
||||||
task := s.generateBasicTask()
|
task := s.generateBasicTask(false)
|
||||||
task.completeTask()
|
task.completeTask()
|
||||||
partitionStats := s.meta.GetPartitionStatsMeta().GetPartitionStats(task.GetCollectionID(), task.GetPartitionID(), task.GetChannel(), task.GetPlanID())
|
partitionStats := s.meta.GetPartitionStatsMeta().GetPartitionStats(task.GetCollectionID(), task.GetPartitionID(), task.GetChannel(), task.GetPlanID())
|
||||||
s.True(partitionStats.GetCommitTime() > time.Now().Add(-2*time.Second).Unix())
|
s.True(partitionStats.GetCommitTime() > time.Now().Add(-2*time.Second).Unix())
|
||||||
|
@ -370,7 +627,7 @@ const (
|
||||||
FloatVecField = "floatVecField"
|
FloatVecField = "floatVecField"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ConstructScalarClusteringSchema(collection string, dim int, autoID bool, fields ...*schemapb.FieldSchema) *schemapb.CollectionSchema {
|
func ConstructClusteringSchema(collection string, dim int, autoID bool, vectorClusteringKey bool, fields ...*schemapb.FieldSchema) *schemapb.CollectionSchema {
|
||||||
// if fields are specified, construct it
|
// if fields are specified, construct it
|
||||||
if len(fields) > 0 {
|
if len(fields) > 0 {
|
||||||
return &schemapb.CollectionSchema{
|
return &schemapb.CollectionSchema{
|
||||||
|
@ -382,15 +639,14 @@ func ConstructScalarClusteringSchema(collection string, dim int, autoID bool, fi
|
||||||
|
|
||||||
// if no field is specified, use default
|
// if no field is specified, use default
|
||||||
pk := &schemapb.FieldSchema{
|
pk := &schemapb.FieldSchema{
|
||||||
FieldID: 100,
|
FieldID: 100,
|
||||||
Name: Int64Field,
|
Name: Int64Field,
|
||||||
IsPrimaryKey: true,
|
IsPrimaryKey: true,
|
||||||
Description: "",
|
Description: "",
|
||||||
DataType: schemapb.DataType_Int64,
|
DataType: schemapb.DataType_Int64,
|
||||||
TypeParams: nil,
|
TypeParams: nil,
|
||||||
IndexParams: nil,
|
IndexParams: nil,
|
||||||
AutoID: autoID,
|
AutoID: autoID,
|
||||||
IsClusteringKey: true,
|
|
||||||
}
|
}
|
||||||
fVec := &schemapb.FieldSchema{
|
fVec := &schemapb.FieldSchema{
|
||||||
FieldID: 101,
|
FieldID: 101,
|
||||||
|
@ -406,6 +662,13 @@ func ConstructScalarClusteringSchema(collection string, dim int, autoID bool, fi
|
||||||
},
|
},
|
||||||
IndexParams: nil,
|
IndexParams: nil,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if vectorClusteringKey {
|
||||||
|
pk.IsClusteringKey = true
|
||||||
|
} else {
|
||||||
|
fVec.IsClusteringKey = true
|
||||||
|
}
|
||||||
|
|
||||||
return &schemapb.CollectionSchema{
|
return &schemapb.CollectionSchema{
|
||||||
Name: collection,
|
Name: collection,
|
||||||
AutoID: autoID,
|
AutoID: autoID,
|
||||||
|
|
Loading…
Reference in New Issue