mirror of https://github.com/milvus-io/milvus.git
fix: frequent flush cause minio rate limit (#28625)
related to #28549 pr: #28626 1. avoid duplicated sync segments under syncing states 2. add jitter to avoid sync segments at the same time Signed-off-by: xiaofanluan <xiaofan.luan@zilliz.com>pull/29350/head
parent
d9a9eefa49
commit
8e13199da2
|
@ -288,6 +288,10 @@ func (c *ChannelMeta) listSegmentIDsToSync(ts Timestamp) []UniqueID {
|
|||
if !seg.isValid() {
|
||||
continue
|
||||
}
|
||||
// ignore all segments under syncing
|
||||
if seg.isSyncing() {
|
||||
continue
|
||||
}
|
||||
validSegs = append(validSegs, seg)
|
||||
}
|
||||
|
||||
|
|
|
@ -687,6 +687,7 @@ func (t *flushBufferInsertTask) flushInsertData() error {
|
|||
metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Add(float64(len(d)))
|
||||
}
|
||||
}
|
||||
log.Warn("failed to flush insert data", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
@ -710,6 +711,7 @@ func (t *flushBufferDeleteTask) flushDeleteData() error {
|
|||
metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Add(float64(len(d)))
|
||||
}
|
||||
}
|
||||
log.Warn("failed to flush delete data", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -18,6 +18,7 @@ package datanode
|
|||
|
||||
import (
|
||||
"math"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
|
@ -40,7 +41,15 @@ func syncPeriodically() segmentSyncPolicy {
|
|||
for _, seg := range segments {
|
||||
endPosTime := tsoutil.PhysicalTime(ts)
|
||||
minBufferTime := tsoutil.PhysicalTime(seg.minBufferTs())
|
||||
shouldSync := endPosTime.Sub(minBufferTime) >= Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)
|
||||
baseSyncPeriod := Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)
|
||||
|
||||
// jitter to avoid all flush at the same time
|
||||
jitter := time.Duration(rand.Float64() * 0.1 * float64(baseSyncPeriod))
|
||||
|
||||
// Calculate the sync period with jitter
|
||||
syncPeriodWithJitter := baseSyncPeriod + jitter
|
||||
// Determine if the segment should be synced
|
||||
shouldSync := endPosTime.Sub(minBufferTime) >= syncPeriodWithJitter
|
||||
if shouldSync {
|
||||
segmentsToSync = append(segmentsToSync, seg.segmentID)
|
||||
}
|
||||
|
|
|
@ -31,6 +31,8 @@ import (
|
|||
func TestSyncPeriodically(t *testing.T) {
|
||||
t0 := time.Now()
|
||||
|
||||
maxTime := time.Duration(1.2 * float64(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)))
|
||||
minTime := time.Duration(0.5 * float64(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)))
|
||||
tests := []struct {
|
||||
testName string
|
||||
bufferTs time.Time
|
||||
|
@ -39,8 +41,8 @@ func TestSyncPeriodically(t *testing.T) {
|
|||
shouldSyncNum int
|
||||
}{
|
||||
{"test buffer empty", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), true, 0},
|
||||
{"test buffer not empty and stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), false, 1},
|
||||
{"test buffer not empty and not stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second) / 2), false, 0},
|
||||
{"test buffer not empty and stale", t0, t0.Add(maxTime), false, 1},
|
||||
{"test buffer not empty and not stale", t0, t0.Add(minTime), false, 0},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
|
|
Loading…
Reference in New Issue