make the heartbeat backoff configurable
We mainly need this for testing. before this change the backoff defaulted to 1 second and could get up to 10 seconds before the server attempted to do another heartbeat.pull/429/head
parent
b88f760521
commit
ce0044036b
|
@ -73,6 +73,8 @@ write-buffer-size = 10000
|
||||||
protobuf_port = 8099
|
protobuf_port = 8099
|
||||||
protobuf_timeout = "2s" # the write timeout on the protobuf conn any duration parseable by time.ParseDuration
|
protobuf_timeout = "2s" # the write timeout on the protobuf conn any duration parseable by time.ParseDuration
|
||||||
protobuf_heartbeat = "200ms" # the heartbeat interval between the servers. must be parseable by time.ParseDuration
|
protobuf_heartbeat = "200ms" # the heartbeat interval between the servers. must be parseable by time.ParseDuration
|
||||||
|
protobuf_min_backoff = "1s" # the minimum backoff after a failed heartbeat attempt
|
||||||
|
protobuf_max_backoff = "10s" # the maxmimum backoff after a failed heartbeat attempt
|
||||||
|
|
||||||
# How many write requests to potentially buffer in memory per server. If the buffer gets filled then writes
|
# How many write requests to potentially buffer in memory per server. If the buffer gets filled then writes
|
||||||
# will still be logged and once the server has caught up (or come back online) the writes
|
# will still be logged and once the server has caught up (or come back online) the writes
|
||||||
|
|
|
@ -1,17 +1,17 @@
|
||||||
package cluster
|
package cluster
|
||||||
|
|
||||||
import (
|
import (
|
||||||
log "code.google.com/p/log4go"
|
c "configuration"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"protocol"
|
"protocol"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
log "code.google.com/p/log4go"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
DEFAULT_BACKOFF = time.Second
|
|
||||||
MAX_BACKOFF = 10 * time.Second
|
|
||||||
HEARTBEAT_TIMEOUT = 100 * time.Millisecond
|
HEARTBEAT_TIMEOUT = 100 * time.Millisecond
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -24,6 +24,8 @@ type ClusterServer struct {
|
||||||
connection ServerConnection
|
connection ServerConnection
|
||||||
HeartbeatInterval time.Duration
|
HeartbeatInterval time.Duration
|
||||||
Backoff time.Duration
|
Backoff time.Duration
|
||||||
|
MinBackoff time.Duration
|
||||||
|
MaxBackoff time.Duration
|
||||||
isUp bool
|
isUp bool
|
||||||
writeBuffer *WriteBuffer
|
writeBuffer *WriteBuffer
|
||||||
heartbeatStarted bool
|
heartbeatStarted bool
|
||||||
|
@ -44,18 +46,17 @@ const (
|
||||||
Potential
|
Potential
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewClusterServer(raftName, raftConnectionString, protobufConnectionString string, connection ServerConnection, heartbeatInterval time.Duration) *ClusterServer {
|
func NewClusterServer(raftName, raftConnectionString, protobufConnectionString string, connection ServerConnection, config *c.Configuration) *ClusterServer {
|
||||||
if heartbeatInterval.Nanoseconds() < 1000 {
|
|
||||||
heartbeatInterval = time.Millisecond * 10
|
|
||||||
}
|
|
||||||
|
|
||||||
s := &ClusterServer{
|
s := &ClusterServer{
|
||||||
RaftName: raftName,
|
RaftName: raftName,
|
||||||
RaftConnectionString: raftConnectionString,
|
RaftConnectionString: raftConnectionString,
|
||||||
ProtobufConnectionString: protobufConnectionString,
|
ProtobufConnectionString: protobufConnectionString,
|
||||||
connection: connection,
|
connection: connection,
|
||||||
HeartbeatInterval: heartbeatInterval,
|
HeartbeatInterval: config.ProtobufHeartbeatInterval.Duration,
|
||||||
Backoff: DEFAULT_BACKOFF,
|
Backoff: config.ProtobufMinBackoff.Duration,
|
||||||
|
MinBackoff: config.ProtobufMinBackoff.Duration,
|
||||||
|
MaxBackoff: config.ProtobufMaxBackoff.Duration,
|
||||||
heartbeatStarted: false,
|
heartbeatStarted: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,7 +150,7 @@ func (self *ClusterServer) heartbeat() {
|
||||||
|
|
||||||
// otherwise, reset the backoff and mark the server as up
|
// otherwise, reset the backoff and mark the server as up
|
||||||
self.isUp = true
|
self.isUp = true
|
||||||
self.Backoff = DEFAULT_BACKOFF
|
self.Backoff = self.MinBackoff
|
||||||
<-time.After(self.HeartbeatInterval)
|
<-time.After(self.HeartbeatInterval)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -175,8 +176,8 @@ func (self *ClusterServer) handleHeartbeatError(err error) {
|
||||||
log.Warn("Hearbeat error for server: %d - %s: %s", self.Id, self.ProtobufConnectionString, err)
|
log.Warn("Hearbeat error for server: %d - %s: %s", self.Id, self.ProtobufConnectionString, err)
|
||||||
self.isUp = false
|
self.isUp = false
|
||||||
self.Backoff *= 2
|
self.Backoff *= 2
|
||||||
if self.Backoff > MAX_BACKOFF {
|
if self.Backoff > self.MaxBackoff {
|
||||||
self.Backoff = MAX_BACKOFF
|
self.Backoff = self.MaxBackoff
|
||||||
}
|
}
|
||||||
<-time.After(self.Backoff)
|
<-time.After(self.Backoff)
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,6 +70,8 @@ seed-servers = ["hosta:8090", "hostb:8090"]
|
||||||
protobuf_port = 8099
|
protobuf_port = 8099
|
||||||
protobuf_timeout = "2s" # the write timeout on the protobuf conn any duration parseable by time.ParseDuration
|
protobuf_timeout = "2s" # the write timeout on the protobuf conn any duration parseable by time.ParseDuration
|
||||||
protobuf_heartbeat = "200ms" # the heartbeat interval between the servers. must be parseable by time.ParseDuration
|
protobuf_heartbeat = "200ms" # the heartbeat interval between the servers. must be parseable by time.ParseDuration
|
||||||
|
protobuf_min_backoff = "100ms" # the minimum backoff after a failed heartbeat attempt
|
||||||
|
protobuf_max_backoff = "1s" # the maxmimum backoff after a failed heartbeat attempt
|
||||||
|
|
||||||
# How many write requests to potentially buffer in memory per server. If the buffer gets filled then writes
|
# How many write requests to potentially buffer in memory per server. If the buffer gets filled then writes
|
||||||
# will still be logged and once the server has caught up (or come back online) the writes
|
# will still be logged and once the server has caught up (or come back online) the writes
|
||||||
|
|
|
@ -89,6 +89,8 @@ type ClusterConfig struct {
|
||||||
ProtobufPort int `toml:"protobuf_port"`
|
ProtobufPort int `toml:"protobuf_port"`
|
||||||
ProtobufTimeout duration `toml:"protobuf_timeout"`
|
ProtobufTimeout duration `toml:"protobuf_timeout"`
|
||||||
ProtobufHeartbeatInterval duration `toml:"protobuf_heartbeat"`
|
ProtobufHeartbeatInterval duration `toml:"protobuf_heartbeat"`
|
||||||
|
MinBackoff duration `toml:"protobuf_min_backoff"`
|
||||||
|
MaxBackoff duration `toml:"protobuf_max_backoff"`
|
||||||
WriteBufferSize int `toml:"write-buffer-size"`
|
WriteBufferSize int `toml:"write-buffer-size"`
|
||||||
ConcurrentShardQueryLimit int `toml:"concurrent-shard-query-limit"`
|
ConcurrentShardQueryLimit int `toml:"concurrent-shard-query-limit"`
|
||||||
MaxResponseBufferSize int `toml:"max-response-buffer-size"`
|
MaxResponseBufferSize int `toml:"max-response-buffer-size"`
|
||||||
|
@ -204,6 +206,8 @@ type Configuration struct {
|
||||||
ProtobufPort int
|
ProtobufPort int
|
||||||
ProtobufTimeout duration
|
ProtobufTimeout duration
|
||||||
ProtobufHeartbeatInterval duration
|
ProtobufHeartbeatInterval duration
|
||||||
|
ProtobufMinBackoff duration
|
||||||
|
ProtobufMaxBackoff duration
|
||||||
Hostname string
|
Hostname string
|
||||||
LogFile string
|
LogFile string
|
||||||
LogLevel string
|
LogLevel string
|
||||||
|
@ -277,6 +281,18 @@ func parseTomlConfiguration(filename string) (*Configuration, error) {
|
||||||
apiReadTimeout = 5 * time.Second
|
apiReadTimeout = 5 * time.Second
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if tomlConfiguration.Cluster.MinBackoff.Duration == 0 {
|
||||||
|
tomlConfiguration.Cluster.MinBackoff = duration{time.Second}
|
||||||
|
}
|
||||||
|
|
||||||
|
if tomlConfiguration.Cluster.MaxBackoff.Duration == 0 {
|
||||||
|
tomlConfiguration.Cluster.MaxBackoff = duration{10 * time.Second}
|
||||||
|
}
|
||||||
|
|
||||||
|
if tomlConfiguration.Cluster.ProtobufHeartbeatInterval.Duration == 0 {
|
||||||
|
tomlConfiguration.Cluster.ProtobufHeartbeatInterval = duration{10 * time.Millisecond}
|
||||||
|
}
|
||||||
|
|
||||||
config := &Configuration{
|
config := &Configuration{
|
||||||
AdminHttpPort: tomlConfiguration.Admin.Port,
|
AdminHttpPort: tomlConfiguration.Admin.Port,
|
||||||
AdminAssetsDir: tomlConfiguration.Admin.Assets,
|
AdminAssetsDir: tomlConfiguration.Admin.Assets,
|
||||||
|
@ -293,6 +309,8 @@ func parseTomlConfiguration(filename string) (*Configuration, error) {
|
||||||
ProtobufPort: tomlConfiguration.Cluster.ProtobufPort,
|
ProtobufPort: tomlConfiguration.Cluster.ProtobufPort,
|
||||||
ProtobufTimeout: tomlConfiguration.Cluster.ProtobufTimeout,
|
ProtobufTimeout: tomlConfiguration.Cluster.ProtobufTimeout,
|
||||||
ProtobufHeartbeatInterval: tomlConfiguration.Cluster.ProtobufHeartbeatInterval,
|
ProtobufHeartbeatInterval: tomlConfiguration.Cluster.ProtobufHeartbeatInterval,
|
||||||
|
ProtobufMinBackoff: tomlConfiguration.Cluster.MinBackoff,
|
||||||
|
ProtobufMaxBackoff: tomlConfiguration.Cluster.MaxBackoff,
|
||||||
SeedServers: tomlConfiguration.Cluster.SeedServers,
|
SeedServers: tomlConfiguration.Cluster.SeedServers,
|
||||||
DataDir: tomlConfiguration.Storage.Dir,
|
DataDir: tomlConfiguration.Storage.Dir,
|
||||||
LogFile: tomlConfiguration.Logging.File,
|
LogFile: tomlConfiguration.Logging.File,
|
||||||
|
|
|
@ -46,6 +46,8 @@ func (self *LoadConfigurationSuite) TestConfig(c *C) {
|
||||||
|
|
||||||
c.Assert(config.ProtobufPort, Equals, 8099)
|
c.Assert(config.ProtobufPort, Equals, 8099)
|
||||||
c.Assert(config.ProtobufHeartbeatInterval.Duration, Equals, 200*time.Millisecond)
|
c.Assert(config.ProtobufHeartbeatInterval.Duration, Equals, 200*time.Millisecond)
|
||||||
|
c.Assert(config.ProtobufMinBackoff.Duration, Equals, 100*time.Millisecond)
|
||||||
|
c.Assert(config.ProtobufMaxBackoff.Duration, Equals, time.Second)
|
||||||
c.Assert(config.ProtobufTimeout.Duration, Equals, 2*time.Second)
|
c.Assert(config.ProtobufTimeout.Duration, Equals, 2*time.Second)
|
||||||
c.Assert(config.SeedServers, DeepEquals, []string{"hosta:8090", "hostb:8090"})
|
c.Assert(config.SeedServers, DeepEquals, []string{"hosta:8090", "hostb:8090"})
|
||||||
|
|
||||||
|
|
|
@ -371,7 +371,7 @@ func (s *RaftServer) startRaft() error {
|
||||||
connectionString,
|
connectionString,
|
||||||
protobufConnectString,
|
protobufConnectString,
|
||||||
nil,
|
nil,
|
||||||
s.config.ProtobufHeartbeatInterval.Duration)
|
s.config)
|
||||||
command := NewAddPotentialServerCommand(clusterServer)
|
command := NewAddPotentialServerCommand(clusterServer)
|
||||||
_, err = s.doOrProxyCommand(command, "add_server")
|
_, err = s.doOrProxyCommand(command, "add_server")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -614,7 +614,7 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) {
|
||||||
command.ConnectionString,
|
command.ConnectionString,
|
||||||
command.ProtobufConnectionString,
|
command.ProtobufConnectionString,
|
||||||
nil,
|
nil,
|
||||||
s.config.ProtobufHeartbeatInterval.Duration)
|
s.config)
|
||||||
addServer := NewAddPotentialServerCommand(clusterServer)
|
addServer := NewAddPotentialServerCommand(clusterServer)
|
||||||
if _, err := s.raftServer.Do(addServer); err != nil {
|
if _, err := s.raftServer.Do(addServer); err != nil {
|
||||||
log.Error("Error joining raft server: ", err, command)
|
log.Error("Error joining raft server: ", err, command)
|
||||||
|
|
Loading…
Reference in New Issue