From 19f65f50b7e8ca9cce1f2ea2e0fd80f9de92d4d7 Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Fri, 15 Nov 2024 14:41:30 -0800 Subject: [PATCH] fix: optimise write window check (#25558) And expose types and methods for Enterprise use. --- coordinator/points_writer.go | 40 ++++++++++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index abe05d734a..e97c9c2dd2 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -121,10 +121,37 @@ func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point) { s.Shards[shardInfo.ID] = shardInfo } -func withinWriteWindow(rp *meta.RetentionPolicyInfo, p models.Point) bool { - if (rp != nil) && - (((rp.FutureWriteLimit > 0) && p.Time().After(time.Now().Add(rp.FutureWriteLimit))) || - ((rp.PastWriteLimit > 0) && p.Time().Before(time.Now().Add(-rp.PastWriteLimit)))) { +type WriteWindow struct { + before time.Time + checkBefore bool + after time.Time + checkAfter bool +} + +func NewWriteWindow(rp *meta.RetentionPolicyInfo) *WriteWindow { + w := &WriteWindow{checkAfter: false, checkBefore: false} + + if rp == nil { + // Used in tests + return w + } + now := time.Now() + if rp.FutureWriteLimit > 0 { + w.after = now.Add(rp.FutureWriteLimit) + w.checkAfter = true + } + if rp.PastWriteLimit > 0 { + w.before = now.Add(-rp.PastWriteLimit) + w.checkBefore = true + } + return w +} + +func (w *WriteWindow) WithinWindow(t time.Time) bool { + if w.checkBefore && t.Before(w.before) { + return false + } + if w.checkAfter && t.After(w.after) { return false } return true @@ -197,11 +224,12 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) min = time.Now().Add(-rp.Duration) } + ww := NewWriteWindow(rp) for _, p := range wp.Points { // Either the point is outside the scope of the RP, we already have // a suitable shard group for the point, or it is outside the write window // for the RP, and we don't want to unnecessarily create a shard for it - if p.Time().Before(min) || list.Covers(p.Time()) || !withinWriteWindow(rp, p) { + if p.Time().Before(min) || list.Covers(p.Time()) || !ww.WithinWindow(p.Time()) { continue } @@ -221,7 +249,7 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) mapping := NewShardMapping(len(wp.Points)) for _, p := range wp.Points { sg := list.ShardGroupAt(p.Time()) - if sg == nil || !withinWriteWindow(rp, p) { + if sg == nil || !ww.WithinWindow(p.Time()) { // We didn't create a shard group because the point was outside the // scope of the RP, or the point is outside the write window for the RP. mapping.Dropped = append(mapping.Dropped, p)