diff --git a/internal/datanode/channel_meta.go b/internal/datanode/channel_meta.go index f93ea6caa8..0d37911ea2 100644 --- a/internal/datanode/channel_meta.go +++ b/internal/datanode/channel_meta.go @@ -288,6 +288,10 @@ func (c *ChannelMeta) listSegmentIDsToSync(ts Timestamp) []UniqueID { if !seg.isValid() { continue } + // ignore all segments under syncing + if seg.isSyncing() { + continue + } validSegs = append(validSegs, seg) } diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index cddc08e3b2..88556d03bf 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -687,6 +687,7 @@ func (t *flushBufferInsertTask) flushInsertData() error { metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Add(float64(len(d))) } } + log.Warn("failed to flush insert data", zap.Error(err)) return err } return nil @@ -710,6 +711,7 @@ func (t *flushBufferDeleteTask) flushDeleteData() error { metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Add(float64(len(d))) } } + log.Warn("failed to flush delete data", zap.Error(err)) return err } return nil diff --git a/internal/datanode/segment_sync_policy.go b/internal/datanode/segment_sync_policy.go index 72498efb4b..e452093c40 100644 --- a/internal/datanode/segment_sync_policy.go +++ b/internal/datanode/segment_sync_policy.go @@ -18,6 +18,7 @@ package datanode import ( "math" + "math/rand" "sort" "time" @@ -40,7 +41,15 @@ func syncPeriodically() segmentSyncPolicy { for _, seg := range segments { endPosTime := tsoutil.PhysicalTime(ts) minBufferTime := tsoutil.PhysicalTime(seg.minBufferTs()) - shouldSync := endPosTime.Sub(minBufferTime) >= Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second) + baseSyncPeriod := Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second) + + // jitter to avoid all flush at the same time + jitter := time.Duration(rand.Float64() * 0.1 * float64(baseSyncPeriod)) + + // Calculate the sync period with jitter + syncPeriodWithJitter := baseSyncPeriod + jitter + // Determine if the segment should be synced + shouldSync := endPosTime.Sub(minBufferTime) >= syncPeriodWithJitter if shouldSync { segmentsToSync = append(segmentsToSync, seg.segmentID) } diff --git a/internal/datanode/segment_sync_policy_test.go b/internal/datanode/segment_sync_policy_test.go index bc6a14533d..3e56260a10 100644 --- a/internal/datanode/segment_sync_policy_test.go +++ b/internal/datanode/segment_sync_policy_test.go @@ -31,6 +31,8 @@ import ( func TestSyncPeriodically(t *testing.T) { t0 := time.Now() + maxTime := time.Duration(1.2 * float64(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second))) + minTime := time.Duration(0.5 * float64(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second))) tests := []struct { testName string bufferTs time.Time @@ -39,8 +41,8 @@ func TestSyncPeriodically(t *testing.T) { shouldSyncNum int }{ {"test buffer empty", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), true, 0}, - {"test buffer not empty and stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), false, 1}, - {"test buffer not empty and not stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second) / 2), false, 0}, + {"test buffer not empty and stale", t0, t0.Add(maxTime), false, 1}, + {"test buffer not empty and not stale", t0, t0.Add(minTime), false, 0}, } for _, test := range tests {