diff --git a/semaphore.go b/semaphore.go index 31183dc75f..8131dfaaae 100644 --- a/semaphore.go +++ b/semaphore.go @@ -2,27 +2,29 @@ package influxdb import ( "context" + "errors" "time" ) +// ErrNoAcquire is returned when it was not possible to acquire ownership of the +// semaphore. +var ErrNoAcquire = errors.New("ownership not acquired") + // DefaultLeaseTTL is used when a specific lease TTL is not requested. const DefaultLeaseTTL = time.Minute // A Semaphore provides an API for requesting ownership of an expirable semaphore. // -// Acquired semaphores have an expiration period. If they're not released or extended +// Acquired semaphores have an expiration. If they're not released or kept alive // during this period then they will expire and ownership of the semaphore will // be lost. +// +// TODO(edd): add AcquireTTL when needed. It should block. type Semaphore interface { - // TODO(edd): add Acquire and AcquireTTL when needed. These should block. - // TryAcquire attempts to acquire ownership of the semaphore. TryAcquire // must not block. Failure to get ownership of the semaphore should be - // signalled to the caller via the return of a nil Lease. - TryAcquire(context.Context) (Lease, error) - - // TryAcquireTTL is similar to TryAcquire, but a specific TTL is provided. - TryAcquireTTL(ctx context.Context, ttl time.Duration) (Lease, error) + // signalled to the caller via the return of the ErrNoAcquire error. + TryAcquire(ctx context.Context, ttl time.Duration) (Lease, error) } // A Lease represents ownership over a semaphore. It gives the owner the ability @@ -43,11 +45,7 @@ var NopSemaphore Semaphore = nopSemaphore{} type nopSemaphore struct{} -func (nopSemaphore) TryAcquire(context.Context) (Lease, error) { - return nopLease{}, nil -} - -func (nopSemaphore) TryAcquireTTL(ctx context.Context, ttl time.Duration) (Lease, error) { +func (nopSemaphore) TryAcquire(ctx context.Context, ttl time.Duration) (Lease, error) { return nopLease{}, nil } diff --git a/tsdb/tsm1/engine.go b/tsdb/tsm1/engine.go index 0c745721f6..694a06e755 100644 --- a/tsdb/tsm1/engine.go +++ b/tsdb/tsm1/engine.go @@ -1124,15 +1124,17 @@ func (e *Engine) compactFull(ctx context.Context, grp CompactionGroup, wg *sync. ttl = lastCompaction // If the last full compaction took > default ttl then set a new TTL } - lease, err := e.fullCompactionSemaphore.TryAcquireTTL(ctx, ttl) - if err != nil { - e.logger.Warn("Failed to execute full compaction", zap.Error(err), zap.Duration("semaphore_requested_ttl", ttl)) - e.compactionLimiter.Release() - return false - } else if lease == nil { + lease, err := e.fullCompactionSemaphore.TryAcquire(ctx, ttl) + if err == influxdb.ErrNoAcquire { e.logger.Info("Cannot acquire semaphore ownership to carry out full compaction", zap.Duration("semaphore_requested_ttl", ttl)) e.compactionLimiter.Release() return false + } else if err != nil { + e.logger.Warn("Failed to execute full compaction", zap.Error(err), zap.Duration("semaphore_requested_ttl", ttl)) + e.compactionLimiter.Release() + return false + } else if e.fullCompactionSemaphore != influxdb.NopSemaphore { + e.logger.Info("Acquired semaphore ownership for full compaction", zap.Duration("semaphore_requested_ttl", ttl)) } ctx, cancel := context.WithCancel(ctx)