fix(storage/retention): change from string to platform.ID
Co-Authored-by: Jeff Wendling <jeff@influxdata.com>pull/10616/head
parent
735062c798
commit
9298a761de
|
@ -111,7 +111,7 @@ func (s *retentionEnforcer) run() {
|
|||
//
|
||||
// Any series data that (1) belongs to a bucket in the provided map and
|
||||
// (2) falls outside the bucket's indicated retention period will be deleted.
|
||||
func (s *retentionEnforcer) expireData(rpByBucketID map[string]time.Duration, now time.Time) error {
|
||||
func (s *retentionEnforcer) expireData(rpByBucketID map[platform.ID]time.Duration, now time.Time) error {
|
||||
_, logEnd := logger.NewOperation(s.logger, "Data deletion", "data_deletion")
|
||||
defer logEnd()
|
||||
|
||||
|
@ -124,26 +124,30 @@ func (s *retentionEnforcer) expireData(rpByBucketID map[string]time.Duration, no
|
|||
defer cur.Close()
|
||||
|
||||
var mu sync.Mutex
|
||||
badMSketch := make(map[string]struct{}) // Badly formatted measurements.
|
||||
missingBSketch := make(map[string]struct{}) // Missing buckets.
|
||||
badMSketch := make(map[string]struct{}) // Badly formatted measurements.
|
||||
missingBSketch := make(map[platform.ID]struct{}) // Missing buckets.
|
||||
|
||||
var seriesDeleted uint64 // Number of series where a delete is attempted.
|
||||
var seriesSkipped uint64 // Number of series that were skipped from delete.
|
||||
|
||||
fn := func(name []byte, tags models.Tags) (int64, int64, bool) {
|
||||
_, bucketID, err := platform.ReadMeasurement(name)
|
||||
if err != nil {
|
||||
if len(name) != platform.IDLength {
|
||||
mu.Lock()
|
||||
badMSketch[string(name)] = struct{}{}
|
||||
mu.Unlock()
|
||||
atomic.AddUint64(&seriesSkipped, 1)
|
||||
return 0, 0, false
|
||||
|
||||
}
|
||||
|
||||
retentionPeriod, ok := rpByBucketID[string(bucketID)]
|
||||
var n [16]byte
|
||||
copy(n[:], name)
|
||||
_, bucketID := tsdb.DecodeName(n)
|
||||
|
||||
retentionPeriod, ok := rpByBucketID[bucketID]
|
||||
if !ok {
|
||||
mu.Lock()
|
||||
missingBSketch[string(bucketID)] = struct{}{}
|
||||
missingBSketch[bucketID] = struct{}{}
|
||||
mu.Unlock()
|
||||
atomic.AddUint64(&seriesSkipped, 1)
|
||||
return 0, 0, false
|
||||
|
@ -180,16 +184,16 @@ func (s *retentionEnforcer) expireData(rpByBucketID map[string]time.Duration, no
|
|||
|
||||
// getRetentionPeriodPerBucket returns a map of (bucket ID -> retention period)
|
||||
// for all buckets.
|
||||
func (s *retentionEnforcer) getRetentionPeriodPerBucket() (map[string]time.Duration, error) {
|
||||
func (s *retentionEnforcer) getRetentionPeriodPerBucket() (map[platform.ID]time.Duration, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), bucketAPITimeout)
|
||||
defer cancel()
|
||||
buckets, _, err := s.BucketService.FindBuckets(ctx, platform.BucketFilter{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rpByBucketID := make(map[string]time.Duration, len(buckets))
|
||||
rpByBucketID := make(map[platform.ID]time.Duration, len(buckets))
|
||||
for _, bucket := range buckets {
|
||||
rpByBucketID[string(bucket.ID)] = bucket.RetentionPeriod
|
||||
rpByBucketID[bucket.ID] = bucket.RetentionPeriod
|
||||
}
|
||||
return rpByBucketID, nil
|
||||
}
|
||||
|
|
|
@ -25,14 +25,14 @@ func TestService_expireData(t *testing.T) {
|
|||
t.Error(err)
|
||||
}
|
||||
|
||||
if err := service.expireData(map[string]time.Duration{}, now); err != nil {
|
||||
if err := service.expireData(map[platform.ID]time.Duration{}, now); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
|
||||
// Generate some measurement names
|
||||
var names [][]byte
|
||||
rpByBucketID := map[string]time.Duration{}
|
||||
rpByBucketID := map[platform.ID]time.Duration{}
|
||||
expMatchedFrequencies := map[string]int{} // To be used for verifying test results.
|
||||
expRejectedFrequencies := map[string]int{} // To be used for verifying test results.
|
||||
for i := 0; i < 15; i++ {
|
||||
|
@ -42,20 +42,19 @@ func TestService_expireData(t *testing.T) {
|
|||
names = append(names, name)
|
||||
}
|
||||
|
||||
_, bucketID, err := platform.ReadMeasurement(name)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var n [16]byte
|
||||
copy(n[:], name)
|
||||
_, bucketID := tsdb.DecodeName(n)
|
||||
|
||||
// Put 1/3rd in the rpByBucketID into the set to delete and 1/3rd into the set
|
||||
// to not delete because no rp, and 1/3rd into the set to not delete because 0 rp.
|
||||
if i%3 == 0 {
|
||||
rpByBucketID[string(bucketID)] = 3 * time.Hour
|
||||
rpByBucketID[bucketID] = 3 * time.Hour
|
||||
expMatchedFrequencies[string(name)] = repeat
|
||||
} else if i%3 == 1 {
|
||||
expRejectedFrequencies[string(name)] = repeat
|
||||
} else if i%3 == 2 {
|
||||
rpByBucketID[string(bucketID)] = 0
|
||||
rpByBucketID[bucketID] = 0
|
||||
expRejectedFrequencies[string(name)] = repeat
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue