diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 30f8b32d32..23fb765c8b 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -271,11 +271,11 @@ func (t *compactionTrigger) estimateDiskSegmentMaxNumOfRows(collectionID UniqueI return t.estimateDiskSegmentPolicy(collMeta.Schema) } -func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) error { +func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, error) { ctx := context.Background() if len(segments) == 0 { - return nil + return false, nil } collectionID := segments[0].GetCollectionID() @@ -284,24 +284,26 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) error IndexName: "", }) if err != nil { - return err + return false, err } + isDiskIndex := false for _, indexInfo := range resp.IndexInfos { indexParamsMap := funcutil.KeyValuePair2Map(indexInfo.IndexParams) if indexType, ok := indexParamsMap["index_type"]; ok { if indexType == indexparamcheck.IndexDISKANN { diskSegmentMaxRows, err := t.estimateDiskSegmentMaxNumOfRows(collectionID) if err != nil { - return err + return false, err } for _, segment := range segments { segment.MaxRowNum = int64(diskSegmentMaxRows) } + isDiskIndex = true } } } - return nil + return isDiskIndex, nil } func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { @@ -336,7 +338,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { group.segments = FilterInIndexedSegments(t.handler, t.indexCoord, group.segments...) - err := t.updateSegmentMaxSize(group.segments) + isDiskIndex, err := t.updateSegmentMaxSize(group.segments) if err != nil { log.Warn("failed to update segment max size,", zap.Error(err)) continue @@ -351,7 +353,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { return } - plans := t.generatePlans(group.segments, signal.isForce, ct) + plans := t.generatePlans(group.segments, signal.isForce, isDiskIndex, ct) for _, plan := range plans { segIDs := fetchSegIDs(plan.GetSegmentBinlogs()) @@ -419,7 +421,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { return } - err := t.updateSegmentMaxSize(segments) + isDiskIndex, err := t.updateSegmentMaxSize(segments) if err != nil { log.Warn("failed to update segment max size", zap.Error(err)) } @@ -438,7 +440,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { return } - plans := t.generatePlans(segments, signal.isForce, ct) + plans := t.generatePlans(segments, signal.isForce, isDiskIndex, ct) for _, plan := range plans { if t.compactionHandler.isFull() { log.Warn("compaction plan skipped due to handler full", zap.Int64("collection", signal.collectionID), zap.Int64("planID", plan.PlanID)) @@ -467,7 +469,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { } } -func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, compactTime *compactTime) []*datapb.CompactionPlan { +func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, isDiskIndex bool, compactTime *compactTime) []*datapb.CompactionPlan { // find segments need internal compaction // TODO add low priority candidates, for example if the segment is smaller than full 0.9 * max segment size but larger than small segment boundary, we only execute compaction when there are no compaction running actively var prioritizedCandidates []*SegmentInfo @@ -478,7 +480,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, c for _, segment := range segments { segment := segment.ShadowClone() // TODO should we trigger compaction periodically even if the segment has no obvious reason to be compacted? - if force || t.ShouldDoSingleCompaction(segment, compactTime) { + if force || t.ShouldDoSingleCompaction(segment, isDiskIndex, compactTime) { prioritizedCandidates = append(prioritizedCandidates, segment) } else if t.isSmallSegment(segment) { smallCandidates = append(smallCandidates, segment) @@ -741,24 +743,43 @@ func (t *compactionTrigger) isStaleSegment(segment *SegmentInfo) bool { return time.Since(segment.lastFlushTime).Minutes() >= segmentTimedFlushDuration } -func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compactTime *compactTime) bool { - // count all the binlog file count - var totalLogNum int +func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDiskIndex bool, compactTime *compactTime) bool { + // no longer restricted binlog numbers because this is now related to field numbers + var binLog int for _, binlogs := range segment.GetBinlogs() { - totalLogNum += len(binlogs.GetBinlogs()) + binLog += len(binlogs.GetBinlogs()) } + // count all the statlog file count, only for flush generated segments + if len(segment.CompactionFrom) == 0 { + var statsLog int + for _, statsLogs := range segment.GetStatslogs() { + statsLog += len(statsLogs.GetBinlogs()) + } + + var maxSize int + if isDiskIndex { + maxSize = int(Params.DataCoordCfg.DiskSegmentMaxSize.GetAsInt64() * 1024 * 1024 / Params.DataNodeCfg.BinLogMaxSize.GetAsInt64()) + } else { + maxSize = int(Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 / Params.DataNodeCfg.BinLogMaxSize.GetAsInt64()) + } + + // if stats log is more than expected, trigger compaction to reduce stats log size. + // TODO maybe we want to compact to single statslog to reduce watch dml channel cost + // TODO avoid rebuild index twice. + if statsLog > maxSize*2.0 { + log.Info("stats number is too much, trigger compaction", zap.Int64("segment", segment.ID), zap.Int("Bin logs", binLog), zap.Int("Stat logs", statsLog)) + return true + } + } + + var deltaLog int for _, deltaLogs := range segment.GetDeltalogs() { - totalLogNum += len(deltaLogs.GetBinlogs()) + deltaLog += len(deltaLogs.GetBinlogs()) } - for _, statsLogs := range segment.GetStatslogs() { - totalLogNum += len(statsLogs.GetBinlogs()) - } - // avoid segment has too many bin logs and the etcd meta is too large, force trigger compaction - if totalLogNum > Params.DataCoordCfg.SingleCompactionBinlogMaxNum.GetAsInt() { - log.Info("total binlog number is too much, trigger compaction", zap.Int64("segment", segment.ID), - zap.Int("Delta logs", len(segment.GetDeltalogs())), zap.Int("Bin Logs", len(segment.GetBinlogs())), zap.Int("Stat logs", len(segment.GetStatslogs()))) + if deltaLog > Params.DataCoordCfg.SingleCompactionDeltalogMaxNum.GetAsInt() { + log.Info("total delta number is too much, trigger compaction", zap.Int64("segment", segment.ID), zap.Int("Bin logs", binLog), zap.Int("Delta logs", deltaLog)) return true } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index d5e41e1df6..51caf3af12 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -825,6 +825,7 @@ func Test_compactionTrigger_noplan(t *testing.T) { compactTime *compactTime } Params.Init() + Params.DataCoordCfg.MinSegmentToMerge.DefaultValue = "4" vecFieldID := int64(201) tests := []struct { name string @@ -1522,9 +1523,9 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { trigger := newCompactionTrigger(&meta{}, &compactionPlanHandler{}, newMockAllocator(), &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, indexCoord, newMockHandler()) - // Test too many files. + // Test too many deltalogs. var binlogs []*datapb.FieldBinlog - for i := UniqueID(0); i < 5000; i++ { + for i := UniqueID(0); i < 1000; i++ { binlogs = append(binlogs, &datapb.FieldBinlog{ Binlogs: []*datapb.Binlog{ {EntriesNum: 5, LogPath: "log1", LogSize: 100}, @@ -1541,13 +1542,46 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { MaxRowNum: 300, InsertChannel: "ch1", State: commonpb.SegmentState_Flushed, - Binlogs: binlogs, + Deltalogs: binlogs, }, } - couldDo := trigger.ShouldDoSingleCompaction(info, &compactTime{travelTime: 200, expireTime: 0}) + couldDo := trigger.ShouldDoSingleCompaction(info, false, &compactTime{travelTime: 200, expireTime: 0}) assert.True(t, couldDo) + //Test too many stats log + info = &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 2, + PartitionID: 1, + LastExpireTime: 100, + NumOfRows: 100, + MaxRowNum: 300, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Statslogs: binlogs, + }, + } + + couldDo = trigger.ShouldDoSingleCompaction(info, false, &compactTime{travelTime: 200, expireTime: 0}) + assert.True(t, couldDo) + + couldDo = trigger.ShouldDoSingleCompaction(info, true, &compactTime{travelTime: 200, expireTime: 0}) + assert.True(t, couldDo) + + // if only 10 bin logs, then disk index won't trigger compaction + info.Statslogs = binlogs[0:20] + couldDo = trigger.ShouldDoSingleCompaction(info, false, &compactTime{travelTime: 200, expireTime: 0}) + assert.True(t, couldDo) + + couldDo = trigger.ShouldDoSingleCompaction(info, true, &compactTime{travelTime: 200, expireTime: 0}) + assert.False(t, couldDo) + //Test too many stats log but compacted + info.CompactionFrom = []int64{0, 1} + couldDo = trigger.ShouldDoSingleCompaction(info, false, &compactTime{travelTime: 200, expireTime: 0}) + assert.False(t, couldDo) + //Test expire triggered compaction var binlogs2 []*datapb.FieldBinlog for i := UniqueID(0); i < 100; i++ { @@ -1580,15 +1614,15 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { } // expire time < Timestamp To - couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{travelTime: 200, expireTime: 300}) + couldDo = trigger.ShouldDoSingleCompaction(info2, false, &compactTime{travelTime: 200, expireTime: 300}) assert.False(t, couldDo) // didn't reach single compaction size 10 * 1024 * 1024 - couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{travelTime: 200, expireTime: 600}) + couldDo = trigger.ShouldDoSingleCompaction(info2, false, &compactTime{travelTime: 200, expireTime: 600}) assert.False(t, couldDo) // expire time < Timestamp False - couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{travelTime: 200, expireTime: 1200}) + couldDo = trigger.ShouldDoSingleCompaction(info2, false, &compactTime{travelTime: 200, expireTime: 1200}) assert.True(t, couldDo) // Test Delete triggered compaction @@ -1623,11 +1657,11 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { } // expire time < Timestamp To - couldDo = trigger.ShouldDoSingleCompaction(info3, &compactTime{travelTime: 600, expireTime: 0}) + couldDo = trigger.ShouldDoSingleCompaction(info3, false, &compactTime{travelTime: 600, expireTime: 0}) assert.False(t, couldDo) // deltalog is large enough, should do compaction - couldDo = trigger.ShouldDoSingleCompaction(info3, &compactTime{travelTime: 800, expireTime: 0}) + couldDo = trigger.ShouldDoSingleCompaction(info3, false, &compactTime{travelTime: 800, expireTime: 0}) assert.True(t, couldDo) } diff --git a/internal/datanode/buffer.go b/internal/datanode/buffer.go index 9c738878d8..44c3b2f6ce 100644 --- a/internal/datanode/buffer.go +++ b/internal/datanode/buffer.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/milvus-io/milvus/internal/util/typeutil" ) // DelBufferManager is in charge of managing insertBuf and delBuf from an overall prospect @@ -332,32 +333,20 @@ func (ddb *DelDataBuf) updateStartAndEndPosition(startPos *internalpb.MsgPositio // * This need to change for string field support and multi-vector fields support. func newBufferData(collSchema *schemapb.CollectionSchema) (*BufferData, error) { // Get Dimension - // TODO GOOSE: under assumption that there's only 1 Vector field in one collection schema - var vectorSize int - for _, field := range collSchema.Fields { - if field.DataType == schemapb.DataType_FloatVector || - field.DataType == schemapb.DataType_BinaryVector { - - dimension, err := storage.GetDimFromParams(field.TypeParams) - switch field.DataType { - case schemapb.DataType_FloatVector: - vectorSize = dimension * 4 - case schemapb.DataType_BinaryVector: - vectorSize = dimension / 8 - } - if err != nil { - log.Error("failed to get dim from field", zap.Error(err)) - return nil, err - } - break - } + size, err := typeutil.EstimateSizePerRecord(collSchema) + if err != nil { + log.Warn("failed to estimate size per record", zap.Error(err)) + return nil, err } - if vectorSize == 0 { - return nil, errors.New("Invalid dimension") + if size == 0 { + return nil, errors.New("Invalid schema") } - limit := Params.DataNodeCfg.FlushInsertBufferSize.GetAsInt64() / int64(vectorSize) + limit := Params.DataNodeCfg.FlushInsertBufferSize.GetAsInt64() / int64(size) + if Params.DataNodeCfg.FlushInsertBufferSize.GetAsInt64()%int64(size) != 0 { + limit++ + } //TODO::xige-16 eval vec and string field return &BufferData{ diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 0b27eec5c1..362f79c8e8 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -240,12 +240,10 @@ func (t *compactionTask) merge( mergeStart := time.Now() var ( - dim int // dimension of float/binary vector field maxRowsPerBinlog int // maximum rows populating one binlog numBinlogs int // binlog number numRows int64 // the number of rows uploaded expired int64 // the number of expired entity - err error // statslog generation pkID UniqueID @@ -300,25 +298,25 @@ func (t *compactionTask) merge( pkID = fs.GetFieldID() pkType = fs.GetDataType() } - if fs.GetDataType() == schemapb.DataType_FloatVector || - fs.GetDataType() == schemapb.DataType_BinaryVector { - for _, t := range fs.GetTypeParams() { - if t.Key == "dim" { - if dim, err = strconv.Atoi(t.Value); err != nil { - log.Warn("strconv wrong on get dim", zap.Error(err)) - return nil, nil, 0, err - } - break - } - } - } + } + + // estimate Rows per binlog + // TODO should not convert size to row because we already know the size, this is especially important on varchar types. + size, err := typeutil.EstimateSizePerRecord(meta.GetSchema()) + if err != nil { + log.Warn("failed to estimate size per record", zap.Error(err)) + return nil, nil, 0, err + } + + maxRowsPerBinlog = int(Params.DataNodeCfg.BinLogMaxSize.GetAsInt64() / int64(size)) + if Params.DataNodeCfg.BinLogMaxSize.GetAsInt64()%int64(size) != 0 { + maxRowsPerBinlog++ } expired = 0 numRows = 0 numBinlogs = 0 currentTs := t.GetCurrentTime() - maxRowsPerBinlog = int(Params.DataNodeCfg.FlushInsertBufferSize.GetAsInt64() / (int64(dim) * 4)) currentRows := 0 downloadTimeCost := time.Duration(0) uploadInsertTimeCost := time.Duration(0) @@ -327,14 +325,14 @@ func (t *compactionTask) merge( downloadStart := time.Now() data, err := t.download(ctxTimeout, path) if err != nil { - log.Warn("download insertlogs wrong") + log.Warn("download insertlogs wrong", zap.Error(err)) return nil, nil, 0, err } downloadTimeCost += time.Since(downloadStart) iter, err := storage.NewInsertBinlogIterator(data, pkID, pkType) if err != nil { - log.Warn("new insert binlogs Itr wrong") + log.Warn("new insert binlogs Itr wrong", zap.Error(err)) return nil, nil, 0, err } for iter.HasNext() { @@ -370,11 +368,11 @@ func (t *compactionTask) merge( } currentRows++ - - if currentRows == maxRowsPerBinlog { + if currentRows >= maxRowsPerBinlog { uploadInsertStart := time.Now() inPaths, statsPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, fID2Content, fID2Type) if err != nil { + log.Warn("failed to upload single insert log", zap.Error(err)) return nil, nil, 0, err } uploadInsertTimeCost += time.Since(uploadInsertStart) @@ -392,6 +390,7 @@ func (t *compactionTask) merge( uploadInsertStart := time.Now() inPaths, statsPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, fID2Content, fID2Type) if err != nil { + log.Warn("failed to upload single insert log", zap.Error(err)) return nil, nil, 0, err } uploadInsertTimeCost += time.Since(uploadInsertStart) diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index d95352bb0b..bbd92fd728 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -307,11 +307,11 @@ func TestCompactionTaskInnerMethods(t *testing.T) { alloc := NewAllocatorFactory(1) mockbIO := &binlogIO{cm, alloc} paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") - flushInsertBufferSize := Params.DataNodeCfg.FlushInsertBufferSize + BinLogMaxSize := Params.DataNodeCfg.BinLogMaxSize defer func() { - Params.DataNodeCfg.FlushInsertBufferSize = flushInsertBufferSize + Params.DataNodeCfg.BinLogMaxSize = BinLogMaxSize }() - paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "128") + paramtable.Get().Save(Params.DataNodeCfg.BinLogMaxSize.Key, "128") iData := genInsertDataWithExpiredTS() meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index b0e5ac825d..91b75465e4 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -373,7 +373,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { t.Run("Pure auto flush", func(t *testing.T) { // iBNode.insertBuffer.maxSize = 2 tmp := Params.DataNodeCfg.FlushInsertBufferSize - paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "16") + paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "200") defer func() { Params.DataNodeCfg.FlushInsertBufferSize = tmp }() @@ -465,7 +465,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { t.Run("Auto with manual flush", func(t *testing.T) { tmp := Params.DataNodeCfg.FlushInsertBufferSize - paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "16") + paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "200") defer func() { Params.DataNodeCfg.FlushInsertBufferSize = tmp }() @@ -607,7 +607,7 @@ func TestRollBF(t *testing.T) { t.Run("Pure roll BF", func(t *testing.T) { tmp := Params.DataNodeCfg.FlushInsertBufferSize - paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "16") + paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "200") defer func() { Params.DataNodeCfg.FlushInsertBufferSize = tmp }() @@ -697,7 +697,7 @@ func (s *InsertBufferNodeSuit) SetupSuite() { s.originalConfig = Params.DataNodeCfg.FlushInsertBufferSize.GetAsInt64() // change flushing size to 2 - paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "16") + paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "200") } func (s *InsertBufferNodeSuit) TearDownSuite() { diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index 693664daf4..1656dc6f41 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -1252,7 +1252,7 @@ type dataCoordConfig struct { SingleCompactionRatioThreshold ParamItem SingleCompactionDeltaLogMaxSize ParamItem SingleCompactionExpiredLogMaxSize ParamItem - SingleCompactionBinlogMaxNum ParamItem + SingleCompactionDeltalogMaxNum ParamItem GlobalCompactionInterval ParamItem // Garbage Collection @@ -1338,7 +1338,7 @@ func (p *dataCoordConfig) init(base *BaseTable) { p.MinSegmentToMerge = ParamItem{ Key: "dataCoord.compaction.min.segment", Version: "2.0.0", - DefaultValue: "4", + DefaultValue: "3", } p.MinSegmentToMerge.Init(base.mgr) @@ -1405,12 +1405,12 @@ func (p *dataCoordConfig) init(base *BaseTable) { } p.SingleCompactionExpiredLogMaxSize.Init(base.mgr) - p.SingleCompactionBinlogMaxNum = ParamItem{ - Key: "dataCoord.compaction.single.binlog.maxnum", + p.SingleCompactionDeltalogMaxNum = ParamItem{ + Key: "dataCoord.compaction.single.deltalog.maxnum", Version: "2.0.0", DefaultValue: "1000", } - p.SingleCompactionBinlogMaxNum.Init(base.mgr) + p.SingleCompactionDeltalogMaxNum.Init(base.mgr) p.GlobalCompactionInterval = ParamItem{ Key: "dataCoord.compaction.global.interval", @@ -1464,6 +1464,7 @@ type dataNodeConfig struct { // segment FlushInsertBufferSize ParamItem FlushDeleteBufferBytes ParamItem + BinLogMaxSize ParamItem SyncPeriod ParamItem // io concurrency to fetch stats logs @@ -1501,6 +1502,13 @@ func (p *dataNodeConfig) init(base *BaseTable) { } p.FlushDeleteBufferBytes.Init(base.mgr) + p.BinLogMaxSize = ParamItem{ + Key: "datanode.segment.binlog.maxsize", + Version: "2.0.0", + DefaultValue: "67108864", + } + p.BinLogMaxSize.Init(base.mgr) + p.SyncPeriod = ParamItem{ Key: "datanode.segment.syncPeriod", Version: "2.0.0",