mirror of https://github.com/milvus-io/milvus.git
enhance: Send flush signal when the water level reaches the high watermark (#34907)
issue: #30633 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>pull/34913/head
parent
575ce91039
commit
260a6e2ba9
|
@ -188,7 +188,7 @@ func (t *clusteringCompactionTask) init() error {
|
||||||
segIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedSegments().GetBegin(), t.plan.GetPreAllocatedSegments().GetEnd())
|
segIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedSegments().GetBegin(), t.plan.GetPreAllocatedSegments().GetEnd())
|
||||||
t.logIDAlloc = logIDAlloc
|
t.logIDAlloc = logIDAlloc
|
||||||
t.segIDAlloc = segIDAlloc
|
t.segIDAlloc = segIDAlloc
|
||||||
|
|
||||||
var pkField *schemapb.FieldSchema
|
var pkField *schemapb.FieldSchema
|
||||||
if t.plan.Schema == nil {
|
if t.plan.Schema == nil {
|
||||||
return merr.WrapErrIllegalCompactionPlan("empty schema in compactionPlan")
|
return merr.WrapErrIllegalCompactionPlan("empty schema in compactionPlan")
|
||||||
|
@ -616,7 +616,7 @@ func (t *clusteringCompactionTask) mappingSegment(
|
||||||
pack: pack,
|
pack: pack,
|
||||||
id: clusterBuffer.id,
|
id: clusterBuffer.id,
|
||||||
}
|
}
|
||||||
} else if currentBufferTotalMemorySize > t.getMemoryBufferBlockFlushThreshold() && !t.hasSignal.Load() {
|
} else if currentBufferTotalMemorySize > t.getMemoryBufferHighWatermark() && !t.hasSignal.Load() {
|
||||||
// reach flushBinlog trigger threshold
|
// reach flushBinlog trigger threshold
|
||||||
log.Debug("largest buffer need to flush",
|
log.Debug("largest buffer need to flush",
|
||||||
zap.Int64("currentBufferTotalMemorySize", currentBufferTotalMemorySize))
|
zap.Int64("currentBufferTotalMemorySize", currentBufferTotalMemorySize))
|
||||||
|
@ -640,8 +640,8 @@ func (t *clusteringCompactionTask) mappingSegment(
|
||||||
default:
|
default:
|
||||||
// currentSize := t.getCurrentBufferWrittenMemorySize()
|
// currentSize := t.getCurrentBufferWrittenMemorySize()
|
||||||
currentSize := t.getBufferTotalUsedMemorySize()
|
currentSize := t.getBufferTotalUsedMemorySize()
|
||||||
if currentSize < t.getMemoryBufferBlockFlushThreshold() {
|
if currentSize < t.getMemoryBufferHighWatermark() {
|
||||||
log.Debug("memory is already below the block watermark, continue writing",
|
log.Debug("memory is already below the high watermark, continue writing",
|
||||||
zap.Int64("currentSize", currentSize))
|
zap.Int64("currentSize", currentSize))
|
||||||
break loop
|
break loop
|
||||||
}
|
}
|
||||||
|
@ -693,7 +693,7 @@ func (t *clusteringCompactionTask) getMemoryBufferLowWatermark() int64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *clusteringCompactionTask) getMemoryBufferHighWatermark() int64 {
|
func (t *clusteringCompactionTask) getMemoryBufferHighWatermark() int64 {
|
||||||
return int64(float64(t.memoryBufferSize) * 0.9)
|
return int64(float64(t.memoryBufferSize) * 0.7)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *clusteringCompactionTask) getMemoryBufferBlockFlushThreshold() int64 {
|
func (t *clusteringCompactionTask) getMemoryBufferBlockFlushThreshold() int64 {
|
||||||
|
|
Loading…
Reference in New Issue