refactor: simpify Semaphore interface
parent
b6e911d72c
commit
2727ae3c25
24
semaphore.go
24
semaphore.go
|
@ -2,27 +2,29 @@ package influxdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ErrNoAcquire is returned when it was not possible to acquire ownership of the
|
||||||
|
// semaphore.
|
||||||
|
var ErrNoAcquire = errors.New("ownership not acquired")
|
||||||
|
|
||||||
// DefaultLeaseTTL is used when a specific lease TTL is not requested.
|
// DefaultLeaseTTL is used when a specific lease TTL is not requested.
|
||||||
const DefaultLeaseTTL = time.Minute
|
const DefaultLeaseTTL = time.Minute
|
||||||
|
|
||||||
// A Semaphore provides an API for requesting ownership of an expirable semaphore.
|
// A Semaphore provides an API for requesting ownership of an expirable semaphore.
|
||||||
//
|
//
|
||||||
// Acquired semaphores have an expiration period. If they're not released or extended
|
// Acquired semaphores have an expiration. If they're not released or kept alive
|
||||||
// during this period then they will expire and ownership of the semaphore will
|
// during this period then they will expire and ownership of the semaphore will
|
||||||
// be lost.
|
// be lost.
|
||||||
|
//
|
||||||
|
// TODO(edd): add AcquireTTL when needed. It should block.
|
||||||
type Semaphore interface {
|
type Semaphore interface {
|
||||||
// TODO(edd): add Acquire and AcquireTTL when needed. These should block.
|
|
||||||
|
|
||||||
// TryAcquire attempts to acquire ownership of the semaphore. TryAcquire
|
// TryAcquire attempts to acquire ownership of the semaphore. TryAcquire
|
||||||
// must not block. Failure to get ownership of the semaphore should be
|
// must not block. Failure to get ownership of the semaphore should be
|
||||||
// signalled to the caller via the return of a nil Lease.
|
// signalled to the caller via the return of the ErrNoAcquire error.
|
||||||
TryAcquire(context.Context) (Lease, error)
|
TryAcquire(ctx context.Context, ttl time.Duration) (Lease, error)
|
||||||
|
|
||||||
// TryAcquireTTL is similar to TryAcquire, but a specific TTL is provided.
|
|
||||||
TryAcquireTTL(ctx context.Context, ttl time.Duration) (Lease, error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// A Lease represents ownership over a semaphore. It gives the owner the ability
|
// A Lease represents ownership over a semaphore. It gives the owner the ability
|
||||||
|
@ -43,11 +45,7 @@ var NopSemaphore Semaphore = nopSemaphore{}
|
||||||
|
|
||||||
type nopSemaphore struct{}
|
type nopSemaphore struct{}
|
||||||
|
|
||||||
func (nopSemaphore) TryAcquire(context.Context) (Lease, error) {
|
func (nopSemaphore) TryAcquire(ctx context.Context, ttl time.Duration) (Lease, error) {
|
||||||
return nopLease{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (nopSemaphore) TryAcquireTTL(ctx context.Context, ttl time.Duration) (Lease, error) {
|
|
||||||
return nopLease{}, nil
|
return nopLease{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1124,15 +1124,17 @@ func (e *Engine) compactFull(ctx context.Context, grp CompactionGroup, wg *sync.
|
||||||
ttl = lastCompaction // If the last full compaction took > default ttl then set a new TTL
|
ttl = lastCompaction // If the last full compaction took > default ttl then set a new TTL
|
||||||
}
|
}
|
||||||
|
|
||||||
lease, err := e.fullCompactionSemaphore.TryAcquireTTL(ctx, ttl)
|
lease, err := e.fullCompactionSemaphore.TryAcquire(ctx, ttl)
|
||||||
if err != nil {
|
if err == influxdb.ErrNoAcquire {
|
||||||
e.logger.Warn("Failed to execute full compaction", zap.Error(err), zap.Duration("semaphore_requested_ttl", ttl))
|
|
||||||
e.compactionLimiter.Release()
|
|
||||||
return false
|
|
||||||
} else if lease == nil {
|
|
||||||
e.logger.Info("Cannot acquire semaphore ownership to carry out full compaction", zap.Duration("semaphore_requested_ttl", ttl))
|
e.logger.Info("Cannot acquire semaphore ownership to carry out full compaction", zap.Duration("semaphore_requested_ttl", ttl))
|
||||||
e.compactionLimiter.Release()
|
e.compactionLimiter.Release()
|
||||||
return false
|
return false
|
||||||
|
} else if err != nil {
|
||||||
|
e.logger.Warn("Failed to execute full compaction", zap.Error(err), zap.Duration("semaphore_requested_ttl", ttl))
|
||||||
|
e.compactionLimiter.Release()
|
||||||
|
return false
|
||||||
|
} else if e.fullCompactionSemaphore != influxdb.NopSemaphore {
|
||||||
|
e.logger.Info("Acquired semaphore ownership for full compaction", zap.Duration("semaphore_requested_ttl", ttl))
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
Loading…
Reference in New Issue