mirror of https://github.com/milvus-io/milvus.git
fix: Fix memory buffer error & some renaming (#33850)
#30633 --------- Signed-off-by: wayblink <anyang.wang@zilliz.com>pull/34066/head
parent
2f691f1e67
commit
380d3f4469
|
@ -61,19 +61,21 @@ type clusteringCompactionTask struct {
|
|||
binlogIO io.BinlogIO
|
||||
allocator allocator.Allocator
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
done chan struct{}
|
||||
tr *timerecord.TimeRecorder
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
done chan struct{}
|
||||
tr *timerecord.TimeRecorder
|
||||
mappingPool *conc.Pool[any]
|
||||
flushPool *conc.Pool[any]
|
||||
|
||||
plan *datapb.CompactionPlan
|
||||
|
||||
// schedule
|
||||
spillChan chan SpillSignal
|
||||
pool *conc.Pool[any]
|
||||
// flush
|
||||
flushMutex sync.Mutex
|
||||
flushCount *atomic.Int64
|
||||
flushChan chan SpillSignal
|
||||
|
||||
// metrics
|
||||
spillCount *atomic.Int64
|
||||
writtenRowNum *atomic.Int64
|
||||
|
||||
// inner field
|
||||
|
@ -84,7 +86,6 @@ type clusteringCompactionTask struct {
|
|||
clusteringKeyField *schemapb.FieldSchema
|
||||
primaryKeyField *schemapb.FieldSchema
|
||||
|
||||
spillMutex sync.Mutex
|
||||
memoryBufferSize int64
|
||||
clusterBuffers []*ClusterBuffer
|
||||
clusterBufferLocks *lock.KeyLock[int]
|
||||
|
@ -101,7 +102,7 @@ type ClusterBuffer struct {
|
|||
writer *SegmentWriter
|
||||
bufferRowNum atomic.Int64
|
||||
|
||||
flushedRowNum int64
|
||||
flushedRowNum atomic.Int64
|
||||
flushedBinlogs map[typeutil.UniqueID]*datapb.FieldBinlog
|
||||
|
||||
uploadedSegments []*datapb.CompactionSegment
|
||||
|
@ -129,10 +130,10 @@ func NewClusteringCompactionTask(
|
|||
plan: plan,
|
||||
tr: timerecord.NewTimeRecorder("clustering_compaction"),
|
||||
done: make(chan struct{}, 1),
|
||||
spillChan: make(chan SpillSignal, 100),
|
||||
flushChan: make(chan SpillSignal, 100),
|
||||
clusterBuffers: make([]*ClusterBuffer, 0),
|
||||
clusterBufferLocks: lock.NewKeyLock[int](),
|
||||
spillCount: atomic.NewInt64(0),
|
||||
flushCount: atomic.NewInt64(0),
|
||||
writtenRowNum: atomic.NewInt64(0),
|
||||
}
|
||||
}
|
||||
|
@ -179,7 +180,8 @@ func (t *clusteringCompactionTask) init() error {
|
|||
t.currentTs = tsoutil.GetCurrentTime()
|
||||
t.memoryBufferSize = t.getMemoryBufferSize()
|
||||
workerPoolSize := t.getWorkerPoolSize()
|
||||
t.pool = conc.NewPool[any](workerPoolSize)
|
||||
t.mappingPool = conc.NewPool[any](workerPoolSize)
|
||||
t.flushPool = conc.NewPool[any](workerPoolSize)
|
||||
log.Info("clustering compaction task initialed", zap.Int64("memory_buffer_size", t.memoryBufferSize), zap.Int("worker_pool_size", workerPoolSize))
|
||||
return nil
|
||||
}
|
||||
|
@ -253,7 +255,7 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro
|
|||
metrics.DataNodeCompactionLatency.
|
||||
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.plan.GetType().String()).
|
||||
Observe(float64(t.tr.ElapseSpan().Milliseconds()))
|
||||
log.Info("Clustering compaction finished", zap.Duration("elapse", t.tr.ElapseSpan()))
|
||||
log.Info("Clustering compaction finished", zap.Duration("elapse", t.tr.ElapseSpan()), zap.Int64("flushTimes", t.flushCount.Load()))
|
||||
|
||||
return planResult, nil
|
||||
}
|
||||
|
@ -345,8 +347,8 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
|
|||
inputSegments := t.plan.GetSegmentBinlogs()
|
||||
mapStart := time.Now()
|
||||
|
||||
// start spill goroutine
|
||||
go t.backgroundSpill(ctx)
|
||||
// start flush goroutine
|
||||
go t.backgroundFlush(ctx)
|
||||
|
||||
futures := make([]*conc.Future[any], 0, len(inputSegments))
|
||||
for _, segment := range inputSegments {
|
||||
|
@ -355,7 +357,7 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
|
|||
// only FieldBinlogs needed
|
||||
FieldBinlogs: segment.FieldBinlogs,
|
||||
}
|
||||
future := t.pool.Submit(func() (any, error) {
|
||||
future := t.mappingPool.Submit(func() (any, error) {
|
||||
err := t.mappingSegment(ctx, segmentClone, deltaPk2Ts)
|
||||
return struct{}{}, err
|
||||
})
|
||||
|
@ -365,8 +367,8 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
|
|||
return nil, nil, err
|
||||
}
|
||||
|
||||
// force spill all buffers
|
||||
err := t.spillAll(ctx)
|
||||
// force flush all buffers
|
||||
err := t.flushAll(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -405,7 +407,7 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
|
|||
return resultSegments, resultPartitionStats, nil
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) getWrittenMemoryBufferSize() int64 {
|
||||
func (t *clusteringCompactionTask) getUsedMemoryBufferSize() int64 {
|
||||
var totalBufferSize int64 = 0
|
||||
for _, buffer := range t.clusterBuffers {
|
||||
totalBufferSize = totalBufferSize + int64(buffer.writer.WrittenMemorySize())
|
||||
|
@ -539,22 +541,21 @@ func (t *clusteringCompactionTask) mappingSegment(
|
|||
}
|
||||
remained++
|
||||
|
||||
// currentSize := t.totalBufferSize.Load()
|
||||
if (remained+1)%20 == 0 {
|
||||
currentBufferSize := t.getWrittenMemoryBufferSize()
|
||||
// trigger spill
|
||||
if clusterBuffer.bufferRowNum.Load() > t.plan.GetMaxSegmentRows() || clusterBuffer.writer.IsFull() {
|
||||
if (remained+1)%100 == 0 {
|
||||
currentBufferSize := t.getUsedMemoryBufferSize()
|
||||
// trigger flushBinlog
|
||||
if clusterBuffer.flushedRowNum.Load() > t.plan.GetMaxSegmentRows() || clusterBuffer.writer.IsFull() {
|
||||
// reach segment/binlog max size
|
||||
t.spillChan <- SpillSignal{
|
||||
t.flushChan <- SpillSignal{
|
||||
buffer: clusterBuffer,
|
||||
}
|
||||
} else if currentBufferSize >= t.getMemoryBufferMiddleWatermark() {
|
||||
// reach spill trigger threshold
|
||||
t.spillChan <- SpillSignal{}
|
||||
} else if currentBufferSize >= t.getMemoryBufferHighWatermark() {
|
||||
// reach flushBinlog trigger threshold
|
||||
t.flushChan <- SpillSignal{}
|
||||
}
|
||||
|
||||
// if the total buffer size is too large, block here, wait for memory release by spill
|
||||
if currentBufferSize > t.getMemoryBufferHighWatermark() {
|
||||
// if the total buffer size is too large, block here, wait for memory release by flushBinlog
|
||||
if currentBufferSize > t.getMemoryBufferBlockSpillThreshold() {
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
|
@ -565,8 +566,8 @@ func (t *clusteringCompactionTask) mappingSegment(
|
|||
log.Warn("stop waiting for memory buffer release as task chan done")
|
||||
return nil
|
||||
default:
|
||||
currentSize := t.getWrittenMemoryBufferSize()
|
||||
if currentSize < t.getMemoryBufferMiddleWatermark() {
|
||||
currentSize := t.getUsedMemoryBufferSize()
|
||||
if currentSize < t.getMemoryBufferLowWatermark() {
|
||||
break loop
|
||||
}
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
|
@ -614,15 +615,19 @@ func (t *clusteringCompactionTask) getMemoryBufferSize() int64 {
|
|||
return int64(float64(hardware.GetMemoryCount()) * paramtable.Get().DataNodeCfg.ClusteringCompactionMemoryBufferRatio.GetAsFloat())
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) getMemoryBufferMiddleWatermark() int64 {
|
||||
return int64(float64(t.memoryBufferSize) * 0.5)
|
||||
func (t *clusteringCompactionTask) getMemoryBufferLowWatermark() int64 {
|
||||
return int64(float64(t.memoryBufferSize) * 0.3)
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) getMemoryBufferHighWatermark() int64 {
|
||||
return int64(float64(t.memoryBufferSize) * 0.9)
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) backgroundSpill(ctx context.Context) {
|
||||
func (t *clusteringCompactionTask) getMemoryBufferBlockSpillThreshold() int64 {
|
||||
return t.memoryBufferSize
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) backgroundFlush(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -631,63 +636,68 @@ func (t *clusteringCompactionTask) backgroundSpill(ctx context.Context) {
|
|||
case <-t.done:
|
||||
log.Info("clustering compaction task done")
|
||||
return
|
||||
case signal := <-t.spillChan:
|
||||
case signal := <-t.flushChan:
|
||||
var err error
|
||||
if signal.buffer == nil {
|
||||
err = t.spillLargestBuffers(ctx)
|
||||
err = t.flushLargestBuffers(ctx)
|
||||
} else {
|
||||
err = func() error {
|
||||
t.clusterBufferLocks.Lock(signal.buffer.id)
|
||||
defer t.clusterBufferLocks.Unlock(signal.buffer.id)
|
||||
return t.spill(ctx, signal.buffer)
|
||||
return t.flushBinlog(ctx, signal.buffer)
|
||||
}()
|
||||
}
|
||||
if err != nil {
|
||||
log.Warn("fail to spill data", zap.Error(err))
|
||||
log.Warn("fail to flushBinlog data", zap.Error(err))
|
||||
// todo handle error
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) spillLargestBuffers(ctx context.Context) error {
|
||||
// only one spillLargestBuffers or spillAll should do at the same time
|
||||
t.spillMutex.Lock()
|
||||
defer t.spillMutex.Unlock()
|
||||
func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) error {
|
||||
// only one flushLargestBuffers or flushAll should do at the same time
|
||||
getLock := t.flushMutex.TryLock()
|
||||
if !getLock {
|
||||
return nil
|
||||
}
|
||||
defer t.flushMutex.Unlock()
|
||||
bufferIDs := make([]int, 0)
|
||||
for _, buffer := range t.clusterBuffers {
|
||||
bufferIDs = append(bufferIDs, buffer.id)
|
||||
}
|
||||
sort.Slice(bufferIDs, func(i, j int) bool {
|
||||
return t.clusterBuffers[i].writer.GetRowNum() > t.clusterBuffers[j].writer.GetRowNum()
|
||||
return t.clusterBuffers[i].bufferRowNum.Load() > t.clusterBuffers[j].bufferRowNum.Load()
|
||||
})
|
||||
for index, bufferId := range bufferIDs {
|
||||
log.Info("start flushLargestBuffers", zap.Ints("bufferIDs", bufferIDs))
|
||||
for _, bufferId := range bufferIDs {
|
||||
err := func() error {
|
||||
t.clusterBufferLocks.Lock(bufferId)
|
||||
defer t.clusterBufferLocks.Unlock(bufferId)
|
||||
return t.spill(ctx, t.clusterBuffers[bufferId])
|
||||
return t.flushBinlog(ctx, t.clusterBuffers[bufferId])
|
||||
}()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if index >= len(bufferIDs) {
|
||||
if t.getUsedMemoryBufferSize() <= t.getMemoryBufferLowWatermark() {
|
||||
log.Info("reach memory low water mark", zap.Int64("memoryBufferSize", t.getUsedMemoryBufferSize()))
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) spillAll(ctx context.Context) error {
|
||||
// only one spillLargestBuffers or spillAll should do at the same time
|
||||
t.spillMutex.Lock()
|
||||
defer t.spillMutex.Unlock()
|
||||
func (t *clusteringCompactionTask) flushAll(ctx context.Context) error {
|
||||
// only one flushLargestBuffers or flushAll should do at the same time
|
||||
t.flushMutex.Lock()
|
||||
defer t.flushMutex.Unlock()
|
||||
for _, buffer := range t.clusterBuffers {
|
||||
err := func() error {
|
||||
t.clusterBufferLocks.Lock(buffer.id)
|
||||
defer t.clusterBufferLocks.Unlock(buffer.id)
|
||||
err := t.spill(ctx, buffer)
|
||||
err := t.flushBinlog(ctx, buffer)
|
||||
if err != nil {
|
||||
log.Error("spill fail")
|
||||
log.Error("flushBinlog fail")
|
||||
return err
|
||||
}
|
||||
err = t.packBufferToSegment(ctx, buffer)
|
||||
|
@ -708,16 +718,16 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff
|
|||
for _, fieldBinlog := range buffer.flushedBinlogs {
|
||||
insertLogs = append(insertLogs, fieldBinlog)
|
||||
}
|
||||
statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, buffer.writer, buffer.flushedRowNum)
|
||||
statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, buffer.writer, buffer.flushedRowNum.Load())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// pack current spill data into a segment
|
||||
// pack current flushBinlog data into a segment
|
||||
seg := &datapb.CompactionSegment{
|
||||
PlanID: t.plan.GetPlanID(),
|
||||
SegmentID: buffer.writer.GetSegmentID(),
|
||||
NumOfRows: buffer.flushedRowNum,
|
||||
NumOfRows: buffer.flushedRowNum.Load(),
|
||||
InsertLogs: insertLogs,
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{statPaths},
|
||||
Channel: t.plan.GetChannel(),
|
||||
|
@ -725,27 +735,39 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff
|
|||
buffer.uploadedSegments = append(buffer.uploadedSegments, seg)
|
||||
segmentStats := storage.SegmentStats{
|
||||
FieldStats: []storage.FieldStats{buffer.clusteringKeyFieldStats.Clone()},
|
||||
NumRows: int(buffer.flushedRowNum),
|
||||
NumRows: int(buffer.flushedRowNum.Load()),
|
||||
}
|
||||
buffer.uploadedSegmentStats[buffer.writer.GetSegmentID()] = segmentStats
|
||||
// refresh
|
||||
t.refreshBufferWriter(buffer)
|
||||
buffer.flushedRowNum = 0
|
||||
buffer.flushedRowNum.Store(0)
|
||||
buffer.flushedBinlogs = make(map[typeutil.UniqueID]*datapb.FieldBinlog, 0)
|
||||
log.Info("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("seg", seg.String()), zap.Any("segStats", segmentStats))
|
||||
for _, binlog := range seg.InsertLogs {
|
||||
log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("binlog", binlog.String()))
|
||||
}
|
||||
log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.Any("segStats", segmentStats))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) spill(ctx context.Context, buffer *ClusterBuffer) error {
|
||||
log := log.With(zap.Int("bufferID", buffer.id), zap.Int64("bufferSize", buffer.bufferRowNum.Load()))
|
||||
func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *ClusterBuffer) error {
|
||||
log := log.With(zap.Int("bufferID", buffer.id), zap.Int64("bufferSize", buffer.bufferRowNum.Load()), zap.Int64("segmentID", buffer.writer.GetSegmentID()))
|
||||
if buffer.writer.IsEmpty() {
|
||||
return nil
|
||||
}
|
||||
kvs, partialBinlogs, err := serializeWrite(ctx, t.allocator, buffer.writer)
|
||||
if err != nil {
|
||||
log.Warn("compact wrong, failed to serialize writer", zap.Error(err))
|
||||
|
||||
future := t.flushPool.Submit(func() (any, error) {
|
||||
kvs, partialBinlogs, err := serializeWrite(ctx, t.allocator, buffer.writer)
|
||||
if err != nil {
|
||||
log.Warn("compact wrong, failed to serialize writer", zap.Error(err))
|
||||
return typeutil.NewPair(kvs, partialBinlogs), err
|
||||
}
|
||||
return typeutil.NewPair(kvs, partialBinlogs), nil
|
||||
})
|
||||
if err := conc.AwaitAll(future); err != nil {
|
||||
return err
|
||||
}
|
||||
kvs := future.Value().(typeutil.Pair[map[string][]byte, map[int64]*datapb.FieldBinlog]).A
|
||||
partialBinlogs := future.Value().(typeutil.Pair[map[string][]byte, map[int64]*datapb.FieldBinlog]).B
|
||||
|
||||
if err := t.binlogIO.Upload(ctx, kvs); err != nil {
|
||||
log.Warn("compact wrong, failed to upload kvs", zap.Error(err))
|
||||
|
@ -760,14 +782,14 @@ func (t *clusteringCompactionTask) spill(ctx context.Context, buffer *ClusterBuf
|
|||
}
|
||||
buffer.flushedBinlogs[fID] = tmpBinlog
|
||||
}
|
||||
buffer.flushedRowNum = buffer.flushedRowNum + buffer.bufferRowNum.Load()
|
||||
buffer.flushedRowNum.Add(buffer.bufferRowNum.Load())
|
||||
|
||||
// clean buffer
|
||||
buffer.bufferRowNum.Store(0)
|
||||
|
||||
t.spillCount.Inc()
|
||||
log.Info("finish spill binlogs", zap.Int64("spillCount", t.spillCount.Load()))
|
||||
if buffer.flushedRowNum > t.plan.GetMaxSegmentRows() {
|
||||
t.flushCount.Inc()
|
||||
log.Info("finish flush binlogs", zap.Int64("flushCount", t.flushCount.Load()))
|
||||
if buffer.flushedRowNum.Load() > t.plan.GetMaxSegmentRows() {
|
||||
if err := t.packBufferToSegment(ctx, buffer); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -817,7 +839,7 @@ func (t *clusteringCompactionTask) scalarAnalyze(ctx context.Context) (map[inter
|
|||
CollectionID: segment.CollectionID,
|
||||
PartitionID: segment.PartitionID,
|
||||
}
|
||||
future := t.pool.Submit(func() (any, error) {
|
||||
future := t.mappingPool.Submit(func() (any, error) {
|
||||
analyzeResult, err := t.scalarAnalyzeSegment(ctx, segmentClone)
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
|
|
|
@ -193,7 +193,7 @@ func (t *mixCompactionTask) merge(
|
|||
unflushedRowCount++
|
||||
remainingRowCount++
|
||||
|
||||
if (unflushedRowCount+1)%100 == 0 && writer.IsFull() {
|
||||
if (unflushedRowCount+1)%100 == 0 && writer.FlushAndIsFull() {
|
||||
serWriteStart := time.Now()
|
||||
kvs, partialBinlogs, err := serializeWrite(ctx, t.Allocator, writer)
|
||||
if err != nil {
|
||||
|
@ -214,7 +214,7 @@ func (t *mixCompactionTask) merge(
|
|||
}
|
||||
}
|
||||
|
||||
if !writer.IsEmpty() {
|
||||
if !writer.FlushAndIsEmpty() {
|
||||
serWriteStart := time.Now()
|
||||
kvs, partialBinlogs, err := serializeWrite(ctx, t.Allocator, writer)
|
||||
if err != nil {
|
||||
|
|
|
@ -150,11 +150,19 @@ func (w *SegmentWriter) Finish(actualRowCount int64) (*storage.Blob, error) {
|
|||
}
|
||||
|
||||
func (w *SegmentWriter) IsFull() bool {
|
||||
return w.writer.WrittenMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64()
|
||||
}
|
||||
|
||||
func (w *SegmentWriter) FlushAndIsFull() bool {
|
||||
w.writer.Flush()
|
||||
return w.writer.WrittenMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64()
|
||||
}
|
||||
|
||||
func (w *SegmentWriter) IsEmpty() bool {
|
||||
return w.writer.WrittenMemorySize() == 0
|
||||
}
|
||||
|
||||
func (w *SegmentWriter) FlushAndIsEmpty() bool {
|
||||
w.writer.Flush()
|
||||
return w.writer.WrittenMemorySize() == 0
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
"github.com/apache/arrow/go/v12/arrow"
|
||||
"github.com/apache/arrow/go/v12/arrow/array"
|
||||
|
@ -678,6 +679,7 @@ type SerializeWriter[T any] struct {
|
|||
rw RecordWriter
|
||||
serializer Serializer[T]
|
||||
batchSize int
|
||||
mu sync.Mutex
|
||||
|
||||
buffer []T
|
||||
pos int
|
||||
|
@ -685,6 +687,8 @@ type SerializeWriter[T any] struct {
|
|||
}
|
||||
|
||||
func (sw *SerializeWriter[T]) Flush() error {
|
||||
sw.mu.Lock()
|
||||
defer sw.mu.Unlock()
|
||||
if sw.pos == 0 {
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue