Merge pull request #4284 from influxdb/hh_backoff
Exponential hinted-handoff interval on failpull/4293/head
commit
0a63bb1883
|
@ -11,6 +11,7 @@
|
||||||
- [#4198](https://github.com/influxdb/influxdb/pull/4198): Add basic cluster-service stats
|
- [#4198](https://github.com/influxdb/influxdb/pull/4198): Add basic cluster-service stats
|
||||||
- [#4262](https://github.com/influxdb/influxdb/pull/4262): Allow configuration of UDP retention policy
|
- [#4262](https://github.com/influxdb/influxdb/pull/4262): Allow configuration of UDP retention policy
|
||||||
- [#4265](https://github.com/influxdb/influxdb/pull/4265): Add statistics for Hinted-Handoff
|
- [#4265](https://github.com/influxdb/influxdb/pull/4265): Add statistics for Hinted-Handoff
|
||||||
|
- [#4284](https://github.com/influxdb/influxdb/pull/4284): Add exponential backoff for hinted-handoff failures
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
- [#4166](https://github.com/influxdb/influxdb/pull/4166): Fix parser error on invalid SHOW
|
- [#4166](https://github.com/influxdb/influxdb/pull/4166): Fix parser error on invalid SHOW
|
||||||
|
|
|
@ -19,26 +19,34 @@ const (
|
||||||
// value of 0 disables the rate limit.
|
// value of 0 disables the rate limit.
|
||||||
DefaultRetryRateLimit = 0
|
DefaultRetryRateLimit = 0
|
||||||
|
|
||||||
// DefaultRetryInterval is the default amout of time the system waits before
|
// DefaultRetryInterval is the default amount of time the system waits before
|
||||||
// attempting to flush hinted handoff queues.
|
// attempting to flush hinted handoff queues. With each failure of a hinted
|
||||||
|
// handoff write, this retry interval increases exponentially until it reaches
|
||||||
|
// the maximum
|
||||||
DefaultRetryInterval = time.Second
|
DefaultRetryInterval = time.Second
|
||||||
|
|
||||||
|
// DefaultRetryMaxInterval is the maximum the hinted handoff retry interval
|
||||||
|
// will ever be.
|
||||||
|
DefaultRetryMaxInterval = time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Enabled bool `toml:"enabled"`
|
Enabled bool `toml:"enabled"`
|
||||||
Dir string `toml:"dir"`
|
Dir string `toml:"dir"`
|
||||||
MaxSize int64 `toml:"max-size"`
|
MaxSize int64 `toml:"max-size"`
|
||||||
MaxAge toml.Duration `toml:"max-age"`
|
MaxAge toml.Duration `toml:"max-age"`
|
||||||
RetryRateLimit int64 `toml:"retry-rate-limit"`
|
RetryRateLimit int64 `toml:"retry-rate-limit"`
|
||||||
RetryInterval toml.Duration `toml:"retry-interval"`
|
RetryInterval toml.Duration `toml:"retry-interval"`
|
||||||
|
RetryMaxInterval toml.Duration `toml:"retry-max-interval"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConfig() Config {
|
func NewConfig() Config {
|
||||||
return Config{
|
return Config{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
MaxSize: DefaultMaxSize,
|
MaxSize: DefaultMaxSize,
|
||||||
MaxAge: toml.Duration(DefaultMaxAge),
|
MaxAge: toml.Duration(DefaultMaxAge),
|
||||||
RetryRateLimit: DefaultRetryRateLimit,
|
RetryRateLimit: DefaultRetryRateLimit,
|
||||||
RetryInterval: toml.Duration(DefaultRetryInterval),
|
RetryInterval: toml.Duration(DefaultRetryInterval),
|
||||||
|
RetryMaxInterval: toml.Duration(DefaultRetryMaxInterval),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@ func TestConfigParse(t *testing.T) {
|
||||||
if _, err := toml.Decode(`
|
if _, err := toml.Decode(`
|
||||||
enabled = false
|
enabled = false
|
||||||
retry-interval = "10m"
|
retry-interval = "10m"
|
||||||
|
retry-max-interval = "100m"
|
||||||
max-size=2048
|
max-size=2048
|
||||||
max-age="20m"
|
max-age="20m"
|
||||||
retry-rate-limit=1000
|
retry-rate-limit=1000
|
||||||
|
@ -30,6 +31,10 @@ retry-rate-limit=1000
|
||||||
t.Fatalf("unexpected retry interval: got %v, exp %v", c.RetryInterval, exp)
|
t.Fatalf("unexpected retry interval: got %v, exp %v", c.RetryInterval, exp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if exp := 100 * time.Minute; c.RetryMaxInterval.String() != exp.String() {
|
||||||
|
t.Fatalf("unexpected retry max interval: got %v, exp %v", c.RetryMaxInterval, exp)
|
||||||
|
}
|
||||||
|
|
||||||
if exp := 20 * time.Minute; c.MaxAge.String() != exp.String() {
|
if exp := 20 * time.Minute; c.MaxAge.String() != exp.String() {
|
||||||
t.Fatalf("unexpected max age: got %v, exp %v", c.MaxAge, exp)
|
t.Fatalf("unexpected max age: got %v, exp %v", c.MaxAge, exp)
|
||||||
}
|
}
|
||||||
|
|
|
@ -119,17 +119,29 @@ func (s *Service) WriteShard(shardID, ownerID uint64, points []models.Point) err
|
||||||
|
|
||||||
func (s *Service) retryWrites() {
|
func (s *Service) retryWrites() {
|
||||||
defer s.wg.Done()
|
defer s.wg.Done()
|
||||||
ticker := time.NewTicker(time.Duration(s.cfg.RetryInterval))
|
currInterval := time.Duration(s.cfg.RetryInterval)
|
||||||
defer ticker.Stop()
|
if currInterval > time.Duration(s.cfg.RetryMaxInterval) {
|
||||||
|
currInterval = time.Duration(s.cfg.RetryMaxInterval)
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-s.closing:
|
case <-s.closing:
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-time.After(currInterval):
|
||||||
s.statMap.Add(processReq, 1)
|
s.statMap.Add(processReq, 1)
|
||||||
if err := s.HintedHandoff.Process(); err != nil && err != io.EOF {
|
if err := s.HintedHandoff.Process(); err != nil && err != io.EOF {
|
||||||
s.statMap.Add(processReqFail, 1)
|
s.statMap.Add(processReqFail, 1)
|
||||||
s.Logger.Printf("retried write failed: %v", err)
|
s.Logger.Printf("retried write failed: %v", err)
|
||||||
|
|
||||||
|
currInterval = currInterval * 2
|
||||||
|
if currInterval > time.Duration(s.cfg.RetryMaxInterval) {
|
||||||
|
currInterval = time.Duration(s.cfg.RetryMaxInterval)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Success! Return to configured interval.
|
||||||
|
currInterval = time.Duration(s.cfg.RetryInterval)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue