Merge pull request #6919 from influxdata/js-6708-drop-writes-from-before-rp-window

Drop writes from before the retention policy time window
pull/6656/head
Jonathan A. Sternberg 2016-07-05 13:37:50 -05:00 committed by GitHub
commit 960517fd02
3 changed files with 64 additions and 5 deletions

View File

@ -81,6 +81,7 @@ With this release the systemd configuration files for InfluxDB will use the syst
- [#6911](https://github.com/influxdata/influxdb/issues/6911): Fix fill(previous) when used with math operators.
- [#6934](https://github.com/influxdata/influxdb/pull/6934): Fix regex binary encoding for a measurement.
- [#6942](https://github.com/influxdata/influxdb/pull/6942): Fix panic: truncate the slice when merging the caches.
- [#6708](https://github.com/influxdata/influxdb/issues/6708): Drop writes from before the retention policy time window.
## v0.13.0 [2016-05-12]

View File

@ -21,6 +21,7 @@ const (
statPointWriteReq = "pointReq"
statPointWriteReqLocal = "pointReqLocal"
statWriteOK = "writeOk"
statWriteDrop = "writeDrop"
statWriteTimeout = "writeTimeout"
statWriteErr = "writeError"
statSubWriteOK = "subWriteOk"
@ -169,12 +170,25 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
rp, err := w.MetaClient.RetentionPolicy(wp.Database, wp.RetentionPolicy)
if err != nil {
return nil, err
}
if rp == nil {
} else if rp == nil {
return nil, influxdb.ErrRetentionPolicyNotFound(wp.RetentionPolicy)
}
// Find the minimum time for a point if the retention policy has a shard
// group duration. We will automatically drop any points before this time.
// There is a chance of a time on the edge of the shard group duration to
// sneak through even after it has been removed, but the circumstances are
// rare enough and don't matter enough that we don't account for this
// edge case.
min := time.Unix(0, models.MinNanoTime)
if rp.Duration > 0 {
min = time.Now().Add(-rp.Duration)
}
for _, p := range wp.Points {
if p.Time().Before(min) {
continue
}
timeRanges[p.Time().Truncate(rp.ShardGroupDuration)] = nil
}
@ -189,7 +203,11 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
mapping := NewShardMapping()
for _, p := range wp.Points {
sg := timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]
sg, ok := timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]
if !ok {
w.statMap.Add(statWriteDrop, 1)
continue
}
sh := sg.ShardFor(p.HashID())
mapping.MapPoint(&sh, p)
}

View File

@ -53,7 +53,8 @@ func TestPointsWriter_MapShards_One(t *testing.T) {
// Ensures the points writer maps a multiple points across shard group boundaries.
func TestPointsWriter_MapShards_Multiple(t *testing.T) {
ms := PointsWriterMetaClient{}
rp := NewRetentionPolicy("myp", time.Hour, 3)
rp := NewRetentionPolicy("myp", 0, 3)
rp.ShardGroupDuration = time.Hour
AttachShardGroupInfo(rp, []meta.ShardOwner{
{NodeID: 1},
{NodeID: 2},
@ -79,7 +80,9 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) {
panic("should not get here")
}
c := coordinator.PointsWriter{MetaClient: ms}
c := coordinator.NewPointsWriter()
c.MetaClient = ms
defer c.Close()
pr := &coordinator.WritePointsRequest{
Database: "mydb",
RetentionPolicy: "myrp",
@ -120,6 +123,43 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) {
}
}
// Ensures the points writer does not map points beyond the retention policy.
func TestPointsWriter_MapShards_Invalid(t *testing.T) {
ms := PointsWriterMetaClient{}
rp := NewRetentionPolicy("myp", time.Hour, 3)
ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
return rp, nil
}
ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
return &rp.ShardGroups[0], nil
}
c := coordinator.NewPointsWriter()
c.MetaClient = ms
defer c.Close()
pr := &coordinator.WritePointsRequest{
Database: "mydb",
RetentionPolicy: "myrp",
}
// Add a point that goes beyond the current retention policy.
pr.AddPoint("cpu", 1.0, time.Now().Add(-2*time.Hour), nil)
var (
shardMappings *coordinator.ShardMapping
err error
)
if shardMappings, err = c.MapShards(pr); err != nil {
t.Fatalf("unexpected an error: %v", err)
}
if exp := 0; len(shardMappings.Points) != exp {
t.Errorf("MapShards() len mismatch. got %v, exp %v", len(shardMappings.Points), exp)
}
}
func TestPointsWriter_WritePoints(t *testing.T) {
tests := []struct {
name string