mirror of https://github.com/milvus-io/milvus.git
enhance: improve ut for compaction_policy_clustering (#35205)
#34792 Signed-off-by: wayblink <anyang.wang@zilliz.com>pull/35343/head
parent
634eadd0a1
commit
bb15ecdc13
|
@ -195,17 +195,16 @@ func (policy *clusteringCompactionPolicy) collectionIsClusteringCompacting(colle
|
|||
return false, 0
|
||||
}
|
||||
|
||||
func calculateClusteringCompactionConfig(coll *collectionInfo, view CompactionView, expectedSegmentSize int64) (segmentIDs []int64, totalRows, maxSegmentRows, preferSegmentRows int64, err error) {
|
||||
func calculateClusteringCompactionConfig(coll *collectionInfo, view CompactionView, expectedSegmentSize int64) (totalRows, maxSegmentRows, preferSegmentRows int64, err error) {
|
||||
for _, s := range view.GetSegmentsView() {
|
||||
totalRows += s.NumOfRows
|
||||
segmentIDs = append(segmentIDs, s.ID)
|
||||
}
|
||||
clusteringMaxSegmentSizeRatio := paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.GetAsFloat()
|
||||
clusteringPreferSegmentSizeRatio := paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat()
|
||||
|
||||
maxRows, err := calBySegmentSizePolicy(coll.Schema, expectedSegmentSize)
|
||||
if err != nil {
|
||||
return nil, 0, 0, 0, err
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
maxSegmentRows = int64(float64(maxRows) * clusteringMaxSegmentSizeRatio)
|
||||
preferSegmentRows = int64(float64(maxRows) * clusteringPreferSegmentSizeRatio)
|
||||
|
@ -224,7 +223,7 @@ func triggerClusteringCompactionPolicy(ctx context.Context, meta *meta, collecti
|
|||
log.Info("New data is larger than threshold, do compaction", zap.Int64("newDataSize", newDataSize))
|
||||
return true, nil
|
||||
}
|
||||
log.Info("No partition stats and no enough new data, skip compaction")
|
||||
log.Info("No partition stats and no enough new data, skip compaction", zap.Int64("newDataSize", newDataSize))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
|
@ -284,7 +283,6 @@ func (v *ClusteringSegmentsView) GetSegmentsView() []*SegmentView {
|
|||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return v.segments
|
||||
}
|
||||
|
||||
|
@ -305,11 +303,9 @@ func (v *ClusteringSegmentsView) String() string {
|
|||
}
|
||||
|
||||
func (v *ClusteringSegmentsView) Trigger() (CompactionView, string) {
|
||||
// todo set reason
|
||||
return v, ""
|
||||
}
|
||||
|
||||
func (v *ClusteringSegmentsView) ForceTrigger() (CompactionView, string) {
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
|
|
@ -25,8 +25,12 @@ import (
|
|||
"github.com/stretchr/testify/suite"
|
||||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/metastore/mocks"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
func TestClusteringCompactionPolicySuite(t *testing.T) {
|
||||
|
@ -36,39 +40,63 @@ func TestClusteringCompactionPolicySuite(t *testing.T) {
|
|||
type ClusteringCompactionPolicySuite struct {
|
||||
suite.Suite
|
||||
|
||||
mockAlloc *NMockAllocator
|
||||
mockTriggerManager *MockTriggerManager
|
||||
testLabel *CompactionGroupLabel
|
||||
handler *NMockHandler
|
||||
mockPlanContext *MockCompactionPlanContext
|
||||
catalog *mocks.DataCoordCatalog
|
||||
mockAlloc *NMockAllocator
|
||||
mockTriggerManager *MockTriggerManager
|
||||
handler *NMockHandler
|
||||
mockPlanContext *MockCompactionPlanContext
|
||||
catalog *mocks.DataCoordCatalog
|
||||
meta *meta
|
||||
|
||||
clusteringCompactionPolicy *clusteringCompactionPolicy
|
||||
}
|
||||
|
||||
func (s *ClusteringCompactionPolicySuite) SetupTest() {
|
||||
s.testLabel = &CompactionGroupLabel{
|
||||
CollectionID: 1,
|
||||
PartitionID: 10,
|
||||
Channel: "ch-1",
|
||||
}
|
||||
|
||||
catalog := mocks.NewDataCoordCatalog(s.T())
|
||||
catalog.EXPECT().SavePartitionStatsInfo(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil).Maybe()
|
||||
catalog.EXPECT().ListCompactionTask(mock.Anything).Return(nil, nil).Maybe()
|
||||
catalog.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil).Maybe()
|
||||
catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil).Maybe()
|
||||
s.catalog = catalog
|
||||
|
||||
segments := genSegmentsForMeta(s.testLabel)
|
||||
meta := &meta{segments: NewSegmentsInfo()}
|
||||
for id, segment := range segments {
|
||||
meta.segments.SetSegment(id, segment)
|
||||
compactionTaskMeta, _ := newCompactionTaskMeta(context.TODO(), s.catalog)
|
||||
partitionStatsMeta, _ := newPartitionStatsMeta(context.TODO(), s.catalog)
|
||||
indexMeta, _ := newIndexMeta(context.TODO(), s.catalog)
|
||||
|
||||
meta := &meta{
|
||||
segments: NewSegmentsInfo(),
|
||||
collections: make(map[UniqueID]*collectionInfo, 0),
|
||||
compactionTaskMeta: compactionTaskMeta,
|
||||
partitionStatsMeta: partitionStatsMeta,
|
||||
indexMeta: indexMeta,
|
||||
}
|
||||
s.meta = meta
|
||||
|
||||
mockAllocator := newMockAllocator()
|
||||
mockHandler := NewNMockHandler(s.T())
|
||||
s.handler = mockHandler
|
||||
s.clusteringCompactionPolicy = newClusteringCompactionPolicy(meta, mockAllocator, mockHandler)
|
||||
s.clusteringCompactionPolicy = newClusteringCompactionPolicy(s.meta, mockAllocator, mockHandler)
|
||||
}
|
||||
|
||||
func (s *ClusteringCompactionPolicySuite) TestTrigger() {
|
||||
func (s *ClusteringCompactionPolicySuite) TestEnable() {
|
||||
// by default
|
||||
s.False(s.clusteringCompactionPolicy.Enable())
|
||||
// enable
|
||||
enableAutoCompactionKey := paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key
|
||||
clusteringCompactionEnableKey := paramtable.Get().DataCoordCfg.ClusteringCompactionEnable.Key
|
||||
clusteringCompactionAutoEnableKey := paramtable.Get().DataCoordCfg.ClusteringCompactionAutoEnable.Key
|
||||
paramtable.Get().Save(enableAutoCompactionKey, "true")
|
||||
paramtable.Get().Save(clusteringCompactionEnableKey, "true")
|
||||
paramtable.Get().Save(clusteringCompactionAutoEnableKey, "true")
|
||||
defer paramtable.Get().Reset(enableAutoCompactionKey)
|
||||
defer paramtable.Get().Reset(clusteringCompactionEnableKey)
|
||||
defer paramtable.Get().Reset(clusteringCompactionAutoEnableKey)
|
||||
s.True(s.clusteringCompactionPolicy.Enable())
|
||||
}
|
||||
|
||||
func (s *ClusteringCompactionPolicySuite) TestTriggerWithNoCollecitons() {
|
||||
// trigger with no collections
|
||||
events, err := s.clusteringCompactionPolicy.Trigger()
|
||||
s.NoError(err)
|
||||
gotViews, ok := events[TriggerTypeClustering]
|
||||
|
@ -77,6 +105,92 @@ func (s *ClusteringCompactionPolicySuite) TestTrigger() {
|
|||
s.Equal(0, len(gotViews))
|
||||
}
|
||||
|
||||
func (s *ClusteringCompactionPolicySuite) TestTriggerWithCollections() {
|
||||
// valid collection
|
||||
s.meta.collections[1] = &collectionInfo{
|
||||
ID: 1,
|
||||
Schema: newTestScalarClusteringKeySchema(),
|
||||
}
|
||||
// deleted collection
|
||||
s.meta.collections[2] = &collectionInfo{
|
||||
ID: 2,
|
||||
Schema: newTestScalarClusteringKeySchema(),
|
||||
}
|
||||
s.clusteringCompactionPolicy.meta = s.meta
|
||||
|
||||
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, collectionID int64) (*collectionInfo, error) {
|
||||
if collectionID == 2 {
|
||||
return nil, errors.New("mock get collection fail error")
|
||||
}
|
||||
coll, exist := s.meta.collections[collectionID]
|
||||
if exist {
|
||||
return coll, nil
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
|
||||
// trigger
|
||||
events, err := s.clusteringCompactionPolicy.Trigger()
|
||||
s.NoError(err)
|
||||
gotViews, ok := events[TriggerTypeClustering]
|
||||
s.True(ok)
|
||||
s.NotNil(gotViews)
|
||||
s.Equal(0, len(gotViews))
|
||||
}
|
||||
|
||||
func (s *ClusteringCompactionPolicySuite) TestCalculateClusteringCompactionConfig() {
|
||||
testCases := []struct {
|
||||
description string
|
||||
coll *collectionInfo
|
||||
view CompactionView
|
||||
totalRows int64
|
||||
maxSegmentRows int64
|
||||
preferSegmentRows int64
|
||||
err error
|
||||
}{
|
||||
{
|
||||
description: "",
|
||||
coll: &collectionInfo{
|
||||
Schema: &schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{Key: common.DimKey, Value: "128"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
view: &ClusteringSegmentsView{
|
||||
segments: []*SegmentView{
|
||||
{
|
||||
NumOfRows: 1000,
|
||||
},
|
||||
},
|
||||
},
|
||||
totalRows: int64(1000),
|
||||
maxSegmentRows: int64(2064888),
|
||||
preferSegmentRows: int64(1651910),
|
||||
err: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
s.Run(test.description, func() {
|
||||
expectedSegmentSize := getExpectedSegmentSize(s.meta, test.coll)
|
||||
totalRows, maxSegmentRows, preferSegmentRows, err := calculateClusteringCompactionConfig(test.coll, test.view, expectedSegmentSize)
|
||||
s.Equal(test.totalRows, totalRows)
|
||||
s.Equal(test.maxSegmentRows, maxSegmentRows)
|
||||
s.Equal(test.preferSegmentRows, preferSegmentRows)
|
||||
s.Equal(test.err, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionAbnormal() {
|
||||
// mock error in handler.GetCollection
|
||||
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, errors.New("mock Error")).Once()
|
||||
|
@ -100,11 +214,7 @@ func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionNoClusteringKe
|
|||
}
|
||||
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(coll, nil)
|
||||
|
||||
compactionTaskMeta := newTestCompactionTaskMeta(s.T())
|
||||
s.clusteringCompactionPolicy.meta = &meta{
|
||||
compactionTaskMeta: compactionTaskMeta,
|
||||
}
|
||||
compactionTaskMeta.SaveCompactionTask(&datapb.CompactionTask{
|
||||
s.meta.compactionTaskMeta.SaveCompactionTask(&datapb.CompactionTask{
|
||||
TriggerID: 1,
|
||||
PlanID: 10,
|
||||
CollectionID: 100,
|
||||
|
@ -124,11 +234,7 @@ func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionCompacting() {
|
|||
}
|
||||
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(coll, nil)
|
||||
|
||||
compactionTaskMeta := newTestCompactionTaskMeta(s.T())
|
||||
s.clusteringCompactionPolicy.meta = &meta{
|
||||
compactionTaskMeta: compactionTaskMeta,
|
||||
}
|
||||
compactionTaskMeta.SaveCompactionTask(&datapb.CompactionTask{
|
||||
s.meta.compactionTaskMeta.SaveCompactionTask(&datapb.CompactionTask{
|
||||
TriggerID: 1,
|
||||
PlanID: 10,
|
||||
CollectionID: 100,
|
||||
|
@ -143,10 +249,6 @@ func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionCompacting() {
|
|||
|
||||
func (s *ClusteringCompactionPolicySuite) TestCollectionIsClusteringCompacting() {
|
||||
s.Run("no collection is compacting", func() {
|
||||
compactionTaskMeta := newTestCompactionTaskMeta(s.T())
|
||||
s.clusteringCompactionPolicy.meta = &meta{
|
||||
compactionTaskMeta: compactionTaskMeta,
|
||||
}
|
||||
compacting, triggerID := s.clusteringCompactionPolicy.collectionIsClusteringCompacting(collID)
|
||||
s.False(compacting)
|
||||
s.Equal(int64(0), triggerID)
|
||||
|
@ -191,6 +293,41 @@ func (s *ClusteringCompactionPolicySuite) TestCollectionIsClusteringCompacting()
|
|||
})
|
||||
}
|
||||
|
||||
func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionNormal() {
|
||||
paramtable.Get().Save(Params.DataCoordCfg.ClusteringCompactionNewDataSizeThreshold.Key, "0")
|
||||
defer paramtable.Get().Reset(Params.DataCoordCfg.ClusteringCompactionNewDataSizeThreshold.Key)
|
||||
|
||||
testLabel := &CompactionGroupLabel{
|
||||
CollectionID: 1,
|
||||
PartitionID: 10,
|
||||
Channel: "ch-1",
|
||||
}
|
||||
|
||||
s.meta.collections[testLabel.CollectionID] = &collectionInfo{
|
||||
ID: testLabel.CollectionID,
|
||||
Schema: newTestScalarClusteringKeySchema(),
|
||||
}
|
||||
|
||||
segments := genSegmentsForMeta(testLabel)
|
||||
for id, segment := range segments {
|
||||
s.meta.segments.SetSegment(id, segment)
|
||||
}
|
||||
|
||||
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, collectionID int64) (*collectionInfo, error) {
|
||||
coll, exist := s.meta.collections[collectionID]
|
||||
if exist {
|
||||
return coll, nil
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
|
||||
// trigger
|
||||
view, _, err := s.clusteringCompactionPolicy.triggerOneCollection(context.TODO(), 1, false)
|
||||
s.Equal(1, len(view))
|
||||
s.NoError(err)
|
||||
s.Equal(testLabel, view[0].GetGroupLabel())
|
||||
}
|
||||
|
||||
func (s *ClusteringCompactionPolicySuite) TestGetExpectedSegmentSize() {
|
||||
}
|
||||
|
||||
|
|
|
@ -91,8 +91,8 @@ func (s *L0CompactionPolicySuite) TestTrigger() {
|
|||
ID UniqueID
|
||||
PosT Timestamp
|
||||
|
||||
LogSize int64
|
||||
LogCount int
|
||||
DelatLogSize int64
|
||||
DeltaLogCount int
|
||||
}{
|
||||
{500, 10000, 4 * MB, 1},
|
||||
{501, 10000, 4 * MB, 1},
|
||||
|
@ -103,7 +103,7 @@ func (s *L0CompactionPolicySuite) TestTrigger() {
|
|||
segments := make(map[int64]*SegmentInfo)
|
||||
for _, arg := range segArgs {
|
||||
info := genTestSegmentInfo(s.testLabel, arg.ID, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed)
|
||||
info.Deltalogs = genTestDeltalogs(arg.LogCount, arg.LogSize)
|
||||
info.Deltalogs = genTestBinlogs(arg.DeltaLogCount, arg.DelatLogSize)
|
||||
info.DmlPosition = &msgpb.MsgPosition{Timestamp: arg.PosT}
|
||||
segments[arg.ID] = info
|
||||
}
|
||||
|
@ -150,26 +150,30 @@ func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo {
|
|||
State commonpb.SegmentState
|
||||
PosT Timestamp
|
||||
|
||||
LogSize int64
|
||||
LogCount int
|
||||
InsertLogSize int64
|
||||
InsertLogCount int
|
||||
|
||||
DelatLogSize int64
|
||||
DeltaLogCount int
|
||||
}{
|
||||
{100, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 10000, 4 * MB, 1},
|
||||
{101, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 10000, 4 * MB, 1},
|
||||
{102, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 10000, 4 * MB, 1},
|
||||
{103, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 50000, 4 * MB, 1},
|
||||
{200, datapb.SegmentLevel_L1, commonpb.SegmentState_Growing, 50000, 0, 0},
|
||||
{201, datapb.SegmentLevel_L1, commonpb.SegmentState_Growing, 30000, 0, 0},
|
||||
{300, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed, 10000, 0, 0},
|
||||
{301, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed, 20000, 0, 0},
|
||||
{100, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 10000, 0 * MB, 0, 4 * MB, 1},
|
||||
{101, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 10000, 0 * MB, 0, 4 * MB, 1},
|
||||
{102, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 10000, 0 * MB, 0, 4 * MB, 1},
|
||||
{103, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 50000, 0 * MB, 0, 4 * MB, 1},
|
||||
{200, datapb.SegmentLevel_L1, commonpb.SegmentState_Growing, 50000, 10 * MB, 1, 0, 0},
|
||||
{201, datapb.SegmentLevel_L1, commonpb.SegmentState_Growing, 30000, 10 * MB, 1, 0, 0},
|
||||
{300, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed, 10000, 10 * MB, 1, 0, 0},
|
||||
{301, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed, 20000, 10 * MB, 1, 0, 0},
|
||||
}
|
||||
|
||||
segments := make(map[int64]*SegmentInfo)
|
||||
for _, arg := range segArgs {
|
||||
info := genTestSegmentInfo(label, arg.ID, arg.Level, arg.State)
|
||||
if info.Level == datapb.SegmentLevel_L0 || info.State == commonpb.SegmentState_Flushed {
|
||||
info.Deltalogs = genTestDeltalogs(arg.LogCount, arg.LogSize)
|
||||
info.Deltalogs = genTestBinlogs(arg.DeltaLogCount, arg.DelatLogSize)
|
||||
info.DmlPosition = &msgpb.MsgPosition{Timestamp: arg.PosT}
|
||||
}
|
||||
info.Binlogs = genTestBinlogs(arg.InsertLogCount, arg.InsertLogSize)
|
||||
if info.State == commonpb.SegmentState_Growing {
|
||||
info.StartPosition = &msgpb.MsgPosition{Timestamp: arg.PosT}
|
||||
}
|
||||
|
@ -209,7 +213,7 @@ func genTestSegmentInfo(label *CompactionGroupLabel, ID UniqueID, level datapb.S
|
|||
}
|
||||
}
|
||||
|
||||
func genTestDeltalogs(logCount int, logSize int64) []*datapb.FieldBinlog {
|
||||
func genTestBinlogs(logCount int, logSize int64) []*datapb.FieldBinlog {
|
||||
var binlogs []*datapb.Binlog
|
||||
|
||||
for i := 0; i < logCount; i++ {
|
||||
|
|
|
@ -249,9 +249,10 @@ func (m *CompactionTriggerManager) notify(ctx context.Context, eventType Compact
|
|||
}
|
||||
|
||||
func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, view CompactionView) {
|
||||
log := log.With(zap.String("view", view.String()))
|
||||
taskID, err := m.allocator.allocID(ctx)
|
||||
if err != nil {
|
||||
log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.String("view", view.String()))
|
||||
log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -261,7 +262,7 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context,
|
|||
|
||||
collection, err := m.handler.GetCollection(ctx, view.GetGroupLabel().CollectionID)
|
||||
if err != nil {
|
||||
log.Warn("Failed to submit compaction view to scheduler because get collection fail", zap.String("view", view.String()))
|
||||
log.Warn("Failed to submit compaction view to scheduler because get collection fail", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -283,14 +284,14 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context,
|
|||
err = m.compactionHandler.enqueueCompaction(task)
|
||||
if err != nil {
|
||||
log.Warn("Failed to execute compaction task",
|
||||
zap.Int64("collection", task.CollectionID),
|
||||
zap.Int64("triggerID", task.GetTriggerID()),
|
||||
zap.Int64("planID", task.GetPlanID()),
|
||||
zap.Int64s("segmentIDs", task.GetInputSegments()),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
log.Info("Finish to submit a LevelZeroCompaction plan",
|
||||
zap.Int64("taskID", taskID),
|
||||
zap.Int64("triggerID", task.GetTriggerID()),
|
||||
zap.Int64("planID", task.GetPlanID()),
|
||||
zap.String("type", task.GetType().String()),
|
||||
zap.Int64s("L0 segments", levelZeroSegs),
|
||||
|
@ -298,24 +299,22 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context,
|
|||
}
|
||||
|
||||
func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.Context, view CompactionView) {
|
||||
log := log.With(zap.String("view", view.String()))
|
||||
taskID, _, err := m.allocator.allocN(2)
|
||||
if err != nil {
|
||||
log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.String("view", view.String()),
|
||||
zap.Error(err))
|
||||
log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.Error(err))
|
||||
return
|
||||
}
|
||||
collection, err := m.handler.GetCollection(ctx, view.GetGroupLabel().CollectionID)
|
||||
if err != nil {
|
||||
log.Warn("Failed to submit compaction view to scheduler because get collection fail", zap.String("view", view.String()),
|
||||
zap.Error(err))
|
||||
log.Warn("Failed to submit compaction view to scheduler because get collection fail", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
expectedSegmentSize := m.getExpectedSegmentSize(collection)
|
||||
|
||||
_, totalRows, maxSegmentRows, preferSegmentRows, err := calculateClusteringCompactionConfig(collection, view, expectedSegmentSize)
|
||||
expectedSegmentSize := getExpectedSegmentSize(m.meta, collection)
|
||||
totalRows, maxSegmentRows, preferSegmentRows, err := calculateClusteringCompactionConfig(collection, view, expectedSegmentSize)
|
||||
if err != nil {
|
||||
log.Warn("Failed to calculate cluster compaction config fail", zap.String("view", view.String()), zap.Error(err))
|
||||
log.Warn("Failed to calculate cluster compaction config fail", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -349,30 +348,28 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C
|
|||
err = m.compactionHandler.enqueueCompaction(task)
|
||||
if err != nil {
|
||||
log.Warn("Failed to execute compaction task",
|
||||
zap.Int64("collection", task.CollectionID),
|
||||
zap.Int64("planID", task.GetPlanID()),
|
||||
zap.Int64s("segmentIDs", task.GetInputSegments()),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
log.Info("Finish to submit a clustering compaction task",
|
||||
zap.Int64("taskID", taskID),
|
||||
zap.Int64("triggerID", task.GetTriggerID()),
|
||||
zap.Int64("planID", task.GetPlanID()),
|
||||
zap.String("type", task.GetType().String()),
|
||||
zap.Int64("MaxSegmentRows", task.MaxSegmentRows),
|
||||
zap.Int64("PreferSegmentRows", task.PreferSegmentRows),
|
||||
)
|
||||
}
|
||||
|
||||
func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Context, view CompactionView) {
|
||||
log := log.With(zap.String("view", view.String()))
|
||||
taskID, _, err := m.allocator.allocN(2)
|
||||
if err != nil {
|
||||
log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.String("view", view.String()))
|
||||
log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.Error(err))
|
||||
return
|
||||
}
|
||||
collection, err := m.handler.GetCollection(ctx, view.GetGroupLabel().CollectionID)
|
||||
if err != nil {
|
||||
log.Warn("Failed to submit compaction view to scheduler because get collection fail", zap.String("view", view.String()))
|
||||
log.Warn("Failed to submit compaction view to scheduler because get collection fail", zap.Error(err))
|
||||
return
|
||||
}
|
||||
var totalRows int64 = 0
|
||||
|
@ -399,20 +396,20 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte
|
|||
err = m.compactionHandler.enqueueCompaction(task)
|
||||
if err != nil {
|
||||
log.Warn("Failed to execute compaction task",
|
||||
zap.Int64("collection", task.CollectionID),
|
||||
zap.Int64("triggerID", task.GetTriggerID()),
|
||||
zap.Int64("planID", task.GetPlanID()),
|
||||
zap.Int64s("segmentIDs", task.GetInputSegments()),
|
||||
zap.Error(err))
|
||||
}
|
||||
log.Info("Finish to submit a single compaction task",
|
||||
zap.Int64("taskID", taskID),
|
||||
zap.Int64("triggerID", task.GetTriggerID()),
|
||||
zap.Int64("planID", task.GetPlanID()),
|
||||
zap.String("type", task.GetType().String()),
|
||||
)
|
||||
}
|
||||
|
||||
func (m *CompactionTriggerManager) getExpectedSegmentSize(collection *collectionInfo) int64 {
|
||||
indexInfos := m.meta.indexMeta.GetIndexesForCollection(collection.ID, "")
|
||||
func getExpectedSegmentSize(meta *meta, collection *collectionInfo) int64 {
|
||||
indexInfos := meta.indexMeta.GetIndexesForCollection(collection.ID, "")
|
||||
|
||||
vectorFields := typeutil.GetVectorFieldSchemas(collection.Schema)
|
||||
fieldIndexTypes := lo.SliceToMap(indexInfos, func(t *model.Index) (int64, indexparamcheck.IndexType) {
|
||||
|
|
|
@ -212,7 +212,7 @@ func (s *CompactionTriggerManagerSuite) TestGetExpectedSegmentSize() {
|
|||
},
|
||||
}
|
||||
|
||||
s.Equal(int64(200*1024*1024), s.triggerManager.getExpectedSegmentSize(collection))
|
||||
s.Equal(int64(200*1024*1024), getExpectedSegmentSize(s.triggerManager.meta, collection))
|
||||
})
|
||||
|
||||
s.Run("HNSW & DISKANN", func() {
|
||||
|
@ -267,7 +267,7 @@ func (s *CompactionTriggerManagerSuite) TestGetExpectedSegmentSize() {
|
|||
},
|
||||
}
|
||||
|
||||
s.Equal(int64(100*1024*1024), s.triggerManager.getExpectedSegmentSize(collection))
|
||||
s.Equal(int64(100*1024*1024), getExpectedSegmentSize(s.triggerManager.meta, collection))
|
||||
})
|
||||
|
||||
s.Run("some vector has no index", func() {
|
||||
|
@ -308,6 +308,6 @@ func (s *CompactionTriggerManagerSuite) TestGetExpectedSegmentSize() {
|
|||
},
|
||||
}
|
||||
|
||||
s.Equal(int64(100*1024*1024), s.triggerManager.getExpectedSegmentSize(collection))
|
||||
s.Equal(int64(100*1024*1024), getExpectedSegmentSize(s.triggerManager.meta, collection))
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue