fix: optimise write window check (#25558)
And expose types and methods for Enterprise use.pull/25589/head
parent
07c261a21a
commit
19f65f50b7
|
@ -121,10 +121,37 @@ func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point) {
|
|||
s.Shards[shardInfo.ID] = shardInfo
|
||||
}
|
||||
|
||||
func withinWriteWindow(rp *meta.RetentionPolicyInfo, p models.Point) bool {
|
||||
if (rp != nil) &&
|
||||
(((rp.FutureWriteLimit > 0) && p.Time().After(time.Now().Add(rp.FutureWriteLimit))) ||
|
||||
((rp.PastWriteLimit > 0) && p.Time().Before(time.Now().Add(-rp.PastWriteLimit)))) {
|
||||
type WriteWindow struct {
|
||||
before time.Time
|
||||
checkBefore bool
|
||||
after time.Time
|
||||
checkAfter bool
|
||||
}
|
||||
|
||||
func NewWriteWindow(rp *meta.RetentionPolicyInfo) *WriteWindow {
|
||||
w := &WriteWindow{checkAfter: false, checkBefore: false}
|
||||
|
||||
if rp == nil {
|
||||
// Used in tests
|
||||
return w
|
||||
}
|
||||
now := time.Now()
|
||||
if rp.FutureWriteLimit > 0 {
|
||||
w.after = now.Add(rp.FutureWriteLimit)
|
||||
w.checkAfter = true
|
||||
}
|
||||
if rp.PastWriteLimit > 0 {
|
||||
w.before = now.Add(-rp.PastWriteLimit)
|
||||
w.checkBefore = true
|
||||
}
|
||||
return w
|
||||
}
|
||||
|
||||
func (w *WriteWindow) WithinWindow(t time.Time) bool {
|
||||
if w.checkBefore && t.Before(w.before) {
|
||||
return false
|
||||
}
|
||||
if w.checkAfter && t.After(w.after) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
|
@ -197,11 +224,12 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
|
|||
min = time.Now().Add(-rp.Duration)
|
||||
}
|
||||
|
||||
ww := NewWriteWindow(rp)
|
||||
for _, p := range wp.Points {
|
||||
// Either the point is outside the scope of the RP, we already have
|
||||
// a suitable shard group for the point, or it is outside the write window
|
||||
// for the RP, and we don't want to unnecessarily create a shard for it
|
||||
if p.Time().Before(min) || list.Covers(p.Time()) || !withinWriteWindow(rp, p) {
|
||||
if p.Time().Before(min) || list.Covers(p.Time()) || !ww.WithinWindow(p.Time()) {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -221,7 +249,7 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
|
|||
mapping := NewShardMapping(len(wp.Points))
|
||||
for _, p := range wp.Points {
|
||||
sg := list.ShardGroupAt(p.Time())
|
||||
if sg == nil || !withinWriteWindow(rp, p) {
|
||||
if sg == nil || !ww.WithinWindow(p.Time()) {
|
||||
// We didn't create a shard group because the point was outside the
|
||||
// scope of the RP, or the point is outside the write window for the RP.
|
||||
mapping.Dropped = append(mapping.Dropped, p)
|
||||
|
|
Loading…
Reference in New Issue