enhance: enable manual compaction for collections without indexes (#36577)

issue: #36576

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/36696/head^2
jaime 2024-10-08 19:57:18 +08:00 committed by GitHub
parent 9fdf0505a8
commit ef1832ff9c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 146 additions and 21 deletions

View File

@ -100,7 +100,7 @@ func (policy *singleCompactionPolicy) triggerOneCollection(ctx context.Context,
views := make([]CompactionView, 0)
for _, group := range partSegments {
if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() {
group.segments = FilterInIndexedSegments(policy.handler, policy.meta, group.segments...)
group.segments = FilterInIndexedSegments(policy.handler, policy.meta, false, group.segments...)
}
collectionTTL, err := getCollectionTTL(collection.Properties)

View File

@ -335,7 +335,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
}
if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() {
group.segments = FilterInIndexedSegments(t.handler, t.meta, group.segments...)
group.segments = FilterInIndexedSegments(t.handler, t.meta, signal.isForce, group.segments...)
}
coll, err := t.getCollection(group.collectionID)
@ -693,7 +693,7 @@ func reverseGreedySelect(candidates []*SegmentInfo, free int64, maxSegment int)
func (t *compactionTrigger) getCandidateSegments(channel string, partitionID UniqueID) []*SegmentInfo {
segments := t.meta.GetSegmentsByChannel(channel)
if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() {
segments = FilterInIndexedSegments(t.handler, t.meta, segments...)
segments = FilterInIndexedSegments(t.handler, t.meta, false, segments...)
}
var res []*SegmentInfo

View File

@ -96,6 +96,100 @@ func newMockVersionManager() IndexEngineVersionManager {
var _ compactionPlanContext = (*spyCompactionHandler)(nil)
func Test_compactionTrigger_force_without_index(t *testing.T) {
catalog := mocks.NewDataCoordCatalog(t)
catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil).Maybe()
collectionID := int64(11)
binlogs := []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
}
deltaLogs := []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
}
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 101,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
}
m := &meta{
catalog: catalog,
channelCPs: newChannelCps(),
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: collectionID,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs,
Deltalogs: deltaLogs,
IsSorted: true,
},
},
},
},
indexMeta: &indexMeta{
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{},
indexes: map[UniqueID]map[UniqueID]*model.Index{},
},
collections: map[int64]*collectionInfo{
collectionID: {
ID: collectionID,
Schema: schema,
},
},
}
compactionHandler := &spyCompactionHandler{t: t, spyChan: make(chan *datapb.CompactionPlan, 1), meta: m}
tr := &compactionTrigger{
meta: m,
handler: newMockHandlerWithMeta(m),
allocator: newMock0Allocator(t),
signals: nil,
compactionHandler: compactionHandler,
globalTrigger: nil,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
_, err := tr.triggerManualCompaction(collectionID)
assert.NoError(t, err)
select {
case val := <-compactionHandler.spyChan:
assert.Equal(t, 1, len(val.SegmentBinlogs))
return
case <-time.After(3 * time.Second):
assert.Fail(t, "failed to get plan")
return
}
}
func Test_compactionTrigger_force(t *testing.T) {
paramtable.Init()
type fields struct {
@ -848,6 +942,9 @@ func Test_compactionTrigger_noplan(t *testing.T) {
Params.DataCoordCfg.MinSegmentToMerge.DefaultValue = "4"
vecFieldID := int64(201)
mock0Allocator := newMockAllocator(t)
im := newSegmentIndexMeta(nil)
im.indexes[2] = make(map[UniqueID]*model.Index)
tests := []struct {
name string
fields fields
@ -859,7 +956,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
"test no plan",
fields{
&meta{
indexMeta: newSegmentIndexMeta(nil),
indexMeta: im,
// 4 segment
channelCPs: newChannelCps(),

View File

@ -440,7 +440,7 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) {
droppedCompactTo[to] = struct{}{}
}
}
indexedSegments := FilterInIndexedSegments(gc.handler, gc.meta, lo.Keys(droppedCompactTo)...)
indexedSegments := FilterInIndexedSegments(gc.handler, gc.meta, false, lo.Keys(droppedCompactTo)...)
indexedSet := make(typeutil.UniqueSet)
for _, segment := range indexedSegments {
indexedSet.Insert(segment.GetID())

View File

@ -129,7 +129,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
segments := h.s.meta.GetRealSegmentsForChannel(channel.GetName())
validSegmentInfos := make(map[int64]*SegmentInfo)
indexedSegments := FilterInIndexedSegments(h, h.s.meta, segments...)
indexedSegments := FilterInIndexedSegments(h, h.s.meta, false, segments...)
indexed := typeutil.NewUniqueSet(lo.Map(indexedSegments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })...)
unIndexedIDs := make(typeutil.UniqueSet)

View File

@ -988,3 +988,17 @@ func (m *indexMeta) AreAllDiskIndex(collectionID int64, schema *schemapb.Collect
allDiskIndex := len(vectorFields) == len(vectorFieldsWithDiskIndex)
return allDiskIndex
}
func (m *indexMeta) HasIndex(collectionID int64) bool {
m.RLock()
defer m.RUnlock()
indexes, ok := m.indexes[collectionID]
if ok {
for _, index := range indexes {
if !index.IsDeleted {
return true
}
}
}
return false
}

View File

@ -72,7 +72,7 @@ func VerifyResponse(response interface{}, err error) error {
}
}
func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo) []*SegmentInfo {
func FilterInIndexedSegments(handler Handler, mt *meta, skipNoIndexCollection bool, segments ...*SegmentInfo) []*SegmentInfo {
if len(segments) == 0 {
return nil
}
@ -83,6 +83,12 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo
ret := make([]*SegmentInfo, 0)
for collection, segmentList := range collectionSegments {
// No segments will be filtered if there are no indices in the collection.
if skipNoIndexCollection && !mt.indexMeta.HasIndex(collection) {
ret = append(ret, segmentList...)
continue
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
coll, err := handler.GetCollection(ctx, collection)
cancel()

View File

@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/tests/integration"
)
@ -71,6 +72,17 @@ func (s *CompactionSuite) TestMixCompaction() {
s.NoError(err)
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: integration.ConstructIndexParam(dim, indexType, metricType),
})
err = merr.CheckRPCCall(createIndexStatus, err)
s.NoError(err)
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
// show collection
showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
err = merr.CheckRPCCall(showCollectionsResp, err)
@ -110,22 +122,13 @@ func (s *CompactionSuite) TestMixCompaction() {
log.Info("insert done", zap.Int("i", i))
}
// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: integration.ConstructIndexParam(dim, indexType, metricType),
})
err = merr.CheckRPCCall(createIndexStatus, err)
s.NoError(err)
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
// stats task happened
s.Equal(rowNum/batch, len(segments)/2)
// The stats task of segments will create a new segment, potentially triggering compaction simultaneously,
// which may lead to an increase or decrease in the number of segments.
s.True(len(segments) > 0)
for _, segment := range segments {
log.Info("show segment result", zap.String("segment", segment.String()))
}
@ -135,16 +138,21 @@ func (s *CompactionSuite) TestMixCompaction() {
segments, err = c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
compactFromSegments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool {
return segment.GetState() == commonpb.SegmentState_Dropped
})
compactToSegments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool {
return segment.GetState() == commonpb.SegmentState_Flushed
})
log.Info("ShowSegments result", zap.Int("len(compactFromSegments)", len(compactFromSegments)),
zap.Int("len(compactToSegments)", len(compactToSegments)))
return len(compactToSegments) == 1
// The small segments can be merged based on dataCoord.compaction.min.segment
return len(compactToSegments) <= paramtable.Get().DataCoordCfg.MinSegmentToMerge.GetAsInt()
}
for !showSegments() {
select {
case <-ctx.Done():