enhance: Determine the number of buffers based on the resource limits of the DataNode (#38209)

issue: #28410

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
pull/38028/head^2
cai.zhang 2024-12-08 18:02:40 +08:00 committed by GitHub
parent 32f575be0f
commit 41b19c6b1d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 189 additions and 43 deletions

View File

@ -683,7 +683,7 @@ dataNode:
slot:
slotCap: 16 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode
clusteringCompaction:
memoryBufferRatio: 0.1 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be flushed to storage.
memoryBufferRatio: 0.3 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be flushed to storage.
workPoolSize: 8 # worker pool size for one clustering compaction job.
bloomFilterApplyParallelFactor: 4 # parallel factor when to apply pk to bloom filter, default to 4*CPU_CORE_NUM
storage:

View File

@ -60,6 +60,10 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
expectedBinlogSize = 16 * 1024 * 1024
)
var _ Compactor = (*clusteringCompactionTask)(nil)
type clusteringCompactionTask struct {
@ -360,6 +364,71 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
return nil
}
func splitCentroids(centroids []int, num int) ([][]int, map[int]int) {
if num <= 0 {
return nil, nil
}
result := make([][]int, num)
resultIndex := make(map[int]int, len(centroids))
listLen := len(centroids)
for i := 0; i < listLen; i++ {
group := i % num
result[group] = append(result[group], centroids[i])
resultIndex[i] = group
}
return result, resultIndex
}
func (t *clusteringCompactionTask) generatedVectorPlan(bufferNum int, centroids []*schemapb.VectorField) error {
centroidsOffset := make([]int, len(centroids))
for i := 0; i < len(centroids); i++ {
centroidsOffset[i] = i
}
centroidGroups, groupIndex := splitCentroids(centroidsOffset, bufferNum)
for id, group := range centroidGroups {
fieldStats, err := storage.NewFieldStats(t.clusteringKeyField.FieldID, t.clusteringKeyField.DataType, 0)
if err != nil {
return err
}
centroidValues := make([]storage.VectorFieldValue, len(group))
for i, offset := range group {
centroidValues[i] = storage.NewVectorFieldValue(t.clusteringKeyField.DataType, centroids[offset])
}
fieldStats.SetVectorCentroids(centroidValues...)
clusterBuffer := &ClusterBuffer{
id: id,
flushedRowNum: map[typeutil.UniqueID]atomic.Int64{},
flushedBinlogs: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog, 0),
uploadedSegments: make([]*datapb.CompactionSegment, 0),
uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0),
clusteringKeyFieldStats: fieldStats,
}
if _, err = t.refreshBufferWriterWithPack(clusterBuffer); err != nil {
return err
}
t.clusterBuffers = append(t.clusterBuffers, clusterBuffer)
}
t.offsetToBufferFunc = func(offset int64, idMapping []uint32) *ClusterBuffer {
centroidGroupOffset := groupIndex[int(idMapping[offset])]
return t.clusterBuffers[centroidGroupOffset]
}
return nil
}
func (t *clusteringCompactionTask) switchPolicyForVectorPlan(centroids *clusteringpb.ClusteringCentroidsStats) error {
bufferNum := len(centroids.GetCentroids())
bufferNumByMemory := int(t.memoryBufferSize / expectedBinlogSize)
if bufferNumByMemory < bufferNum {
bufferNum = bufferNumByMemory
}
return t.generatedVectorPlan(bufferNum, centroids.GetCentroids())
}
func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) error {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("getVectorAnalyzeResult-%d", t.GetPlanID()))
defer span.End()
@ -385,29 +454,7 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e
zap.Int("centroidNum", len(centroids.GetCentroids())),
zap.Any("offsetMappingFiles", t.segmentIDOffsetMapping))
for id, centroid := range centroids.GetCentroids() {
fieldStats, err := storage.NewFieldStats(t.clusteringKeyField.FieldID, t.clusteringKeyField.DataType, 0)
if err != nil {
return err
}
fieldStats.SetVectorCentroids(storage.NewVectorFieldValue(t.clusteringKeyField.DataType, centroid))
clusterBuffer := &ClusterBuffer{
id: id,
flushedRowNum: map[typeutil.UniqueID]atomic.Int64{},
flushedBinlogs: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog, 0),
uploadedSegments: make([]*datapb.CompactionSegment, 0),
uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0),
clusteringKeyFieldStats: fieldStats,
}
if _, err = t.refreshBufferWriterWithPack(clusterBuffer); err != nil {
return err
}
t.clusterBuffers = append(t.clusterBuffers, clusterBuffer)
}
t.offsetToBufferFunc = func(offset int64, idMapping []uint32) *ClusterBuffer {
return t.clusterBuffers[idMapping[offset]]
}
return nil
return t.switchPolicyForVectorPlan(centroids)
}
// mapping read and split input segments into buffers
@ -576,6 +623,7 @@ func (t *clusteringCompactionTask) mappingSegment(
fieldBinlogPaths = append(fieldBinlogPaths, ps)
}
var offset int64 = -1
for _, paths := range fieldBinlogPaths {
allValues, err := t.binlogIO.Download(ctx, paths)
if err != nil {
@ -592,7 +640,6 @@ func (t *clusteringCompactionTask) mappingSegment(
return err
}
var offset int64 = -1
for {
err := pkIter.Next()
if err != nil {
@ -1105,6 +1152,7 @@ func (t *clusteringCompactionTask) scalarAnalyze(ctx context.Context) (map[inter
zap.Int64("collectionID", t.GetCollection()),
zap.Int64("partitionID", t.partitionID),
zap.Int("segments", len(inputSegments)),
zap.Int("clustering num", len(analyzeDict)),
zap.Duration("elapse", time.Since(analyzeStart)))
return analyzeDict, nil
}
@ -1219,25 +1267,12 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
return analyzeResult, nil
}
func (t *clusteringCompactionTask) splitClusterByScalarValue(dict map[interface{}]int64) ([][]interface{}, bool) {
keys := lo.MapToSlice(dict, func(k interface{}, _ int64) interface{} {
return k
})
notNullKeys := lo.Filter(keys, func(i interface{}, j int) bool {
return i != nil
})
sort.Slice(notNullKeys, func(i, j int) bool {
return storage.NewScalarFieldValue(t.clusteringKeyField.DataType, notNullKeys[i]).LE(storage.NewScalarFieldValue(t.clusteringKeyField.DataType, notNullKeys[j]))
})
func (t *clusteringCompactionTask) generatedScalarPlan(maxRows, preferRows int64, keys []interface{}, dict map[interface{}]int64) [][]interface{} {
buckets := make([][]interface{}, 0)
currentBucket := make([]interface{}, 0)
var currentBucketSize int64 = 0
maxRows := t.plan.MaxSegmentRows
preferRows := t.plan.PreferSegmentRows
containsNull := len(keys) > len(notNullKeys)
for _, key := range notNullKeys {
for _, key := range keys {
// todo can optimize
if dict[key] > preferRows {
if len(currentBucket) != 0 {
buckets = append(buckets, currentBucket)
@ -1260,7 +1295,38 @@ func (t *clusteringCompactionTask) splitClusterByScalarValue(dict map[interface{
}
}
buckets = append(buckets, currentBucket)
return buckets, containsNull
return buckets
}
func (t *clusteringCompactionTask) switchPolicyForScalarPlan(totalRows int64, keys []interface{}, dict map[interface{}]int64) [][]interface{} {
bufferNumBySegmentMaxRows := totalRows / t.plan.MaxSegmentRows
bufferNumByMemory := t.memoryBufferSize / expectedBinlogSize
log.Info("switchPolicyForScalarPlan", zap.Int64("totalRows", totalRows),
zap.Int64("bufferNumBySegmentMaxRows", bufferNumBySegmentMaxRows),
zap.Int64("bufferNumByMemory", bufferNumByMemory))
if bufferNumByMemory > bufferNumBySegmentMaxRows {
return t.generatedScalarPlan(t.plan.GetMaxSegmentRows(), t.plan.GetPreferSegmentRows(), keys, dict)
}
maxRows := totalRows / bufferNumByMemory
return t.generatedScalarPlan(maxRows, int64(float64(maxRows)*paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat()), keys, dict)
}
func (t *clusteringCompactionTask) splitClusterByScalarValue(dict map[interface{}]int64) ([][]interface{}, bool) {
totalRows := int64(0)
keys := lo.MapToSlice(dict, func(k interface{}, v int64) interface{} {
totalRows += v
return k
})
notNullKeys := lo.Filter(keys, func(i interface{}, j int) bool {
return i != nil
})
sort.Slice(notNullKeys, func(i, j int) bool {
return storage.NewScalarFieldValue(t.clusteringKeyField.DataType, notNullKeys[i]).LE(storage.NewScalarFieldValue(t.clusteringKeyField.DataType, notNullKeys[j]))
})
return t.switchPolicyForScalarPlan(totalRows, notNullKeys, dict), len(keys) > len(notNullKeys)
}
func (t *clusteringCompactionTask) refreshBufferWriterWithPack(buffer *ClusterBuffer) (bool, error) {

View File

@ -19,6 +19,7 @@ package compaction
import (
"context"
"fmt"
"sync"
"testing"
"time"
@ -237,6 +238,85 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
s.Equal(totalRowNum, statsRowNum)
}
func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit() {
schema := genCollectionSchema()
var segmentID int64 = 1001
segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, segmentID, PartitionID, CollectionID, []int64{})
s.Require().NoError(err)
for i := 0; i < 10240; i++ {
v := storage.Value{
PK: storage.NewInt64PrimaryKey(int64(i)),
Timestamp: int64(tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)),
Value: genRow(int64(i)),
}
err = segWriter.Write(&v)
s.Require().NoError(err)
}
segWriter.FlushAndIsFull()
kvs, fBinlogs, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter)
s.NoError(err)
var one sync.Once
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, strings []string) ([][]byte, error) {
// 32m, only two buffers can be generated
one.Do(func() {
s.task.memoryBufferSize = 32 * 1024 * 1024
})
return lo.Values(kvs), nil
})
s.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{
{
SegmentID: segmentID,
FieldBinlogs: lo.Values(fBinlogs),
},
}
s.task.plan.Schema = genCollectionSchema()
s.task.plan.ClusteringKeyField = 100
s.task.plan.PreferSegmentRows = 3000
s.task.plan.MaxSegmentRows = 3000
s.task.plan.PreAllocatedSegmentIDs = &datapb.IDRange{
Begin: time.Now().UnixMilli(),
End: time.Now().UnixMilli() + 1000,
}
// 8+8+8+4+7+4*4=51
// 51*1024 = 52224
// writer will automatically flush after 1024 rows.
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "52223")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key)
paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key, "1")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key)
compactionResult, err := s.task.Compact()
s.Require().NoError(err)
s.Equal(2, len(s.task.clusterBuffers))
s.Equal(4, len(compactionResult.GetSegments()))
totalBinlogNum := 0
totalRowNum := int64(0)
for _, fb := range compactionResult.GetSegments()[0].GetInsertLogs() {
for _, b := range fb.GetBinlogs() {
totalBinlogNum++
if fb.GetFieldID() == 100 {
totalRowNum += b.GetEntriesNum()
}
}
}
statsBinlogNum := 0
statsRowNum := int64(0)
for _, sb := range compactionResult.GetSegments()[0].GetField2StatslogPaths() {
for _, b := range sb.GetBinlogs() {
statsBinlogNum++
statsRowNum += b.GetEntriesNum()
}
}
s.Equal(3, totalBinlogNum/len(schema.GetFields()))
s.Equal(1, statsBinlogNum)
s.Equal(totalRowNum, statsRowNum)
}
func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() {
schema := genCollectionSchemaWithBM25()
var segmentID int64 = 1001

View File

@ -4643,7 +4643,7 @@ if this parameter <= 0, will set it as 10`,
Key: "dataNode.clusteringCompaction.memoryBufferRatio",
Version: "2.4.6",
Doc: "The ratio of memory buffer of clustering compaction. Data larger than threshold will be flushed to storage.",
DefaultValue: "0.1",
DefaultValue: "0.3",
PanicIfEmpty: false,
Export: true,
}