mirror of https://github.com/milvus-io/milvus.git
enhance: Optimize the use of locks and avoid double flush clustering buffer writer (#35486)
issue: #35436 --------- Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>pull/35508/head
parent
aaaffc6d53
commit
1bbf7a3c0e
|
@ -458,9 +458,7 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
|
|||
func (t *clusteringCompactionTask) getBufferTotalUsedMemorySize() int64 {
|
||||
var totalBufferSize int64 = 0
|
||||
for _, buffer := range t.clusterBuffers {
|
||||
t.clusterBufferLocks.RLock(buffer.id)
|
||||
totalBufferSize = totalBufferSize + int64(buffer.writer.WrittenMemorySize()) + buffer.bufferMemorySize.Load()
|
||||
t.clusterBufferLocks.RUnlock(buffer.id)
|
||||
}
|
||||
return totalBufferSize
|
||||
}
|
||||
|
@ -593,30 +591,35 @@ func (t *clusteringCompactionTask) mappingSegment(
|
|||
remained++
|
||||
|
||||
if (remained+1)%100 == 0 {
|
||||
t.clusterBufferLocks.RLock(clusterBuffer.id)
|
||||
currentBufferWriterFull := clusterBuffer.writer.IsFull()
|
||||
t.clusterBufferLocks.RUnlock(clusterBuffer.id)
|
||||
|
||||
currentBufferTotalMemorySize := t.getBufferTotalUsedMemorySize()
|
||||
|
||||
currentSegmentNumRows := clusterBuffer.currentSegmentRowNum.Load()
|
||||
if currentSegmentNumRows > t.plan.GetMaxSegmentRows() || currentBufferWriterFull {
|
||||
if clusterBuffer.currentSegmentRowNum.Load() > t.plan.GetMaxSegmentRows() || clusterBuffer.writer.IsFull() {
|
||||
// reach segment/binlog max size
|
||||
t.clusterBufferLocks.Lock(clusterBuffer.id)
|
||||
writer := clusterBuffer.writer
|
||||
pack, _ := t.refreshBufferWriterWithPack(clusterBuffer)
|
||||
log.Debug("buffer need to flush", zap.Int("bufferID", clusterBuffer.id),
|
||||
zap.Bool("pack", pack),
|
||||
zap.Int64("current segment", writer.GetSegmentID()),
|
||||
zap.Int64("current segment num rows", currentSegmentNumRows),
|
||||
zap.Int64("writer num", writer.GetRowNum()))
|
||||
t.clusterBufferLocks.Unlock(clusterBuffer.id)
|
||||
flushWriterFunc := func() {
|
||||
t.clusterBufferLocks.Lock(clusterBuffer.id)
|
||||
currentSegmentNumRows := clusterBuffer.currentSegmentRowNum.Load()
|
||||
// double-check the condition is still met
|
||||
if currentSegmentNumRows > t.plan.GetMaxSegmentRows() || clusterBuffer.writer.IsFull() {
|
||||
writer := clusterBuffer.writer
|
||||
pack, _ := t.refreshBufferWriterWithPack(clusterBuffer)
|
||||
log.Debug("buffer need to flush", zap.Int("bufferID", clusterBuffer.id),
|
||||
zap.Bool("pack", pack),
|
||||
zap.Int64("current segment", writer.GetSegmentID()),
|
||||
zap.Int64("current segment num rows", currentSegmentNumRows),
|
||||
zap.Int64("writer num", writer.GetRowNum()))
|
||||
|
||||
t.flushChan <- FlushSignal{
|
||||
writer: writer,
|
||||
pack: pack,
|
||||
id: clusterBuffer.id,
|
||||
t.clusterBufferLocks.Unlock(clusterBuffer.id)
|
||||
// release the lock before sending the signal, avoid long wait caused by a full channel.
|
||||
t.flushChan <- FlushSignal{
|
||||
writer: writer,
|
||||
pack: pack,
|
||||
id: clusterBuffer.id,
|
||||
}
|
||||
return
|
||||
}
|
||||
// release the lock even if the conditions are no longer met.
|
||||
t.clusterBufferLocks.Unlock(clusterBuffer.id)
|
||||
}
|
||||
flushWriterFunc()
|
||||
} else if currentBufferTotalMemorySize > t.getMemoryBufferHighWatermark() && !t.hasSignal.Load() {
|
||||
// reach flushBinlog trigger threshold
|
||||
log.Debug("largest buffer need to flush",
|
||||
|
@ -626,7 +629,7 @@ func (t *clusteringCompactionTask) mappingSegment(
|
|||
}
|
||||
|
||||
// if the total buffer size is too large, block here, wait for memory release by flushBinlog
|
||||
if currentBufferTotalMemorySize > t.getMemoryBufferBlockFlushThreshold() {
|
||||
if t.getBufferTotalUsedMemorySize() > t.getMemoryBufferBlockFlushThreshold() {
|
||||
log.Debug("memory is already above the block watermark, pause writing",
|
||||
zap.Int64("currentBufferTotalMemorySize", currentBufferTotalMemorySize))
|
||||
loop:
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
"github.com/apache/arrow/go/v12/parquet"
|
||||
"github.com/apache/arrow/go/v12/parquet/compress"
|
||||
"github.com/apache/arrow/go/v12/parquet/pqarrow"
|
||||
"go.uber.org/atomic"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
|
@ -768,7 +769,7 @@ type SerializeWriter[T any] struct {
|
|||
|
||||
buffer []T
|
||||
pos int
|
||||
writtenMemorySize uint64
|
||||
writtenMemorySize atomic.Uint64
|
||||
}
|
||||
|
||||
func (sw *SerializeWriter[T]) Flush() error {
|
||||
|
@ -787,7 +788,7 @@ func (sw *SerializeWriter[T]) Flush() error {
|
|||
return err
|
||||
}
|
||||
sw.pos = 0
|
||||
sw.writtenMemorySize += size
|
||||
sw.writtenMemorySize.Add(size)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -806,7 +807,7 @@ func (sw *SerializeWriter[T]) Write(value T) error {
|
|||
}
|
||||
|
||||
func (sw *SerializeWriter[T]) WrittenMemorySize() uint64 {
|
||||
return sw.writtenMemorySize
|
||||
return sw.writtenMemorySize.Load()
|
||||
}
|
||||
|
||||
func (sw *SerializeWriter[T]) Close() error {
|
||||
|
|
Loading…
Reference in New Issue