refactor: move goroutine out to function
parent
8f6701d4b1
commit
b6e911d72c
|
@ -1135,32 +1135,8 @@ func (e *Engine) compactFull(ctx context.Context, grp CompactionGroup, wg *sync.
|
|||
return false
|
||||
}
|
||||
|
||||
done := make(chan struct{}) // Closed when compaction finished.
|
||||
go func(lease influxdb.Lease) {
|
||||
ttl, err := lease.TTL(ctx)
|
||||
if err != nil {
|
||||
e.logger.Warn("unable to get TTL for lease on semaphore", zap.Error(err))
|
||||
ttl = influxdb.DefaultLeaseTTL // This is probably a reasonable fallback.
|
||||
}
|
||||
|
||||
// Renew the lease when ttl is halved
|
||||
ticker := time.NewTicker(ttl / 2)
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
if err := lease.Release(ctx); err != nil {
|
||||
e.logger.Warn("Lease on sempahore was not released", zap.Error(err))
|
||||
}
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := lease.KeepAlive(ctx); err != nil {
|
||||
e.logger.Warn("Unable to extend lease", zap.Error(err))
|
||||
} else {
|
||||
e.logger.Info("Extended lease on semaphore")
|
||||
}
|
||||
}
|
||||
}
|
||||
}(lease)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
go e.keepLeaseAlive(ctx, lease) // context cancelled when compaction finished.
|
||||
|
||||
e.compactionTracker.IncFullActive()
|
||||
wg.Add(1)
|
||||
|
@ -1175,13 +1151,41 @@ func (e *Engine) compactFull(ctx context.Context, grp CompactionGroup, wg *sync.
|
|||
|
||||
// Release the files in the compaction plan
|
||||
e.CompactionPlan.Release([]CompactionGroup{s.group})
|
||||
close(done)
|
||||
cancel()
|
||||
}()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// keepLeaseAlive blocks, keeping a lease alive until the context is cancelled.
|
||||
func (e *Engine) keepLeaseAlive(ctx context.Context, lease influxdb.Lease) {
|
||||
ttl, err := lease.TTL(ctx)
|
||||
if err != nil {
|
||||
e.logger.Warn("unable to get TTL for lease on semaphore", zap.Error(err))
|
||||
ttl = influxdb.DefaultLeaseTTL // This is probably a reasonable fallback.
|
||||
}
|
||||
|
||||
// Renew the lease when ttl is halved
|
||||
ticker := time.NewTicker(ttl / 2)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
ticker.Stop()
|
||||
if err := lease.Release(ctx); err != nil {
|
||||
e.logger.Warn("Lease on sempahore was not released", zap.Error(err))
|
||||
}
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := lease.KeepAlive(ctx); err != nil {
|
||||
e.logger.Warn("Unable to extend lease", zap.Error(err))
|
||||
} else {
|
||||
e.logger.Info("Extended lease on semaphore")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// compactionStrategy holds the details of what to do in a compaction.
|
||||
type compactionStrategy struct {
|
||||
group CompactionGroup
|
||||
|
|
Loading…
Reference in New Issue