diff --git a/config.toml.sample b/config.toml.sample index e076e1c8eb..742a8d121a 100644 --- a/config.toml.sample +++ b/config.toml.sample @@ -73,6 +73,8 @@ write-buffer-size = 10000 protobuf_port = 8099 protobuf_timeout = "2s" # the write timeout on the protobuf conn any duration parseable by time.ParseDuration protobuf_heartbeat = "200ms" # the heartbeat interval between the servers. must be parseable by time.ParseDuration +protobuf_min_backoff = "1s" # the minimum backoff after a failed heartbeat attempt +protobuf_max_backoff = "10s" # the maxmimum backoff after a failed heartbeat attempt # How many write requests to potentially buffer in memory per server. If the buffer gets filled then writes # will still be logged and once the server has caught up (or come back online) the writes diff --git a/src/cluster/cluster_server.go b/src/cluster/cluster_server.go index 7509226b88..283a6f3df7 100644 --- a/src/cluster/cluster_server.go +++ b/src/cluster/cluster_server.go @@ -1,17 +1,17 @@ package cluster import ( - log "code.google.com/p/log4go" + c "configuration" "errors" "fmt" "net" "protocol" "time" + + log "code.google.com/p/log4go" ) const ( - DEFAULT_BACKOFF = time.Second - MAX_BACKOFF = 10 * time.Second HEARTBEAT_TIMEOUT = 100 * time.Millisecond ) @@ -24,6 +24,8 @@ type ClusterServer struct { connection ServerConnection HeartbeatInterval time.Duration Backoff time.Duration + MinBackoff time.Duration + MaxBackoff time.Duration isUp bool writeBuffer *WriteBuffer heartbeatStarted bool @@ -44,18 +46,17 @@ const ( Potential ) -func NewClusterServer(raftName, raftConnectionString, protobufConnectionString string, connection ServerConnection, heartbeatInterval time.Duration) *ClusterServer { - if heartbeatInterval.Nanoseconds() < 1000 { - heartbeatInterval = time.Millisecond * 10 - } +func NewClusterServer(raftName, raftConnectionString, protobufConnectionString string, connection ServerConnection, config *c.Configuration) *ClusterServer { s := &ClusterServer{ RaftName: raftName, RaftConnectionString: raftConnectionString, ProtobufConnectionString: protobufConnectionString, connection: connection, - HeartbeatInterval: heartbeatInterval, - Backoff: DEFAULT_BACKOFF, + HeartbeatInterval: config.ProtobufHeartbeatInterval.Duration, + Backoff: config.ProtobufMinBackoff.Duration, + MinBackoff: config.ProtobufMinBackoff.Duration, + MaxBackoff: config.ProtobufMaxBackoff.Duration, heartbeatStarted: false, } @@ -149,7 +150,7 @@ func (self *ClusterServer) heartbeat() { // otherwise, reset the backoff and mark the server as up self.isUp = true - self.Backoff = DEFAULT_BACKOFF + self.Backoff = self.MinBackoff <-time.After(self.HeartbeatInterval) } } @@ -175,8 +176,8 @@ func (self *ClusterServer) handleHeartbeatError(err error) { log.Warn("Hearbeat error for server: %d - %s: %s", self.Id, self.ProtobufConnectionString, err) self.isUp = false self.Backoff *= 2 - if self.Backoff > MAX_BACKOFF { - self.Backoff = MAX_BACKOFF + if self.Backoff > self.MaxBackoff { + self.Backoff = self.MaxBackoff } <-time.After(self.Backoff) } diff --git a/src/configuration/config.toml b/src/configuration/config.toml index eb667285f3..cf8265076e 100644 --- a/src/configuration/config.toml +++ b/src/configuration/config.toml @@ -70,6 +70,8 @@ seed-servers = ["hosta:8090", "hostb:8090"] protobuf_port = 8099 protobuf_timeout = "2s" # the write timeout on the protobuf conn any duration parseable by time.ParseDuration protobuf_heartbeat = "200ms" # the heartbeat interval between the servers. must be parseable by time.ParseDuration +protobuf_min_backoff = "100ms" # the minimum backoff after a failed heartbeat attempt +protobuf_max_backoff = "1s" # the maxmimum backoff after a failed heartbeat attempt # How many write requests to potentially buffer in memory per server. If the buffer gets filled then writes # will still be logged and once the server has caught up (or come back online) the writes diff --git a/src/configuration/configuration.go b/src/configuration/configuration.go index 75207baf5b..58315d7e5a 100644 --- a/src/configuration/configuration.go +++ b/src/configuration/configuration.go @@ -89,6 +89,8 @@ type ClusterConfig struct { ProtobufPort int `toml:"protobuf_port"` ProtobufTimeout duration `toml:"protobuf_timeout"` ProtobufHeartbeatInterval duration `toml:"protobuf_heartbeat"` + MinBackoff duration `toml:"protobuf_min_backoff"` + MaxBackoff duration `toml:"protobuf_max_backoff"` WriteBufferSize int `toml:"write-buffer-size"` ConcurrentShardQueryLimit int `toml:"concurrent-shard-query-limit"` MaxResponseBufferSize int `toml:"max-response-buffer-size"` @@ -204,6 +206,8 @@ type Configuration struct { ProtobufPort int ProtobufTimeout duration ProtobufHeartbeatInterval duration + ProtobufMinBackoff duration + ProtobufMaxBackoff duration Hostname string LogFile string LogLevel string @@ -277,6 +281,18 @@ func parseTomlConfiguration(filename string) (*Configuration, error) { apiReadTimeout = 5 * time.Second } + if tomlConfiguration.Cluster.MinBackoff.Duration == 0 { + tomlConfiguration.Cluster.MinBackoff = duration{time.Second} + } + + if tomlConfiguration.Cluster.MaxBackoff.Duration == 0 { + tomlConfiguration.Cluster.MaxBackoff = duration{10 * time.Second} + } + + if tomlConfiguration.Cluster.ProtobufHeartbeatInterval.Duration == 0 { + tomlConfiguration.Cluster.ProtobufHeartbeatInterval = duration{10 * time.Millisecond} + } + config := &Configuration{ AdminHttpPort: tomlConfiguration.Admin.Port, AdminAssetsDir: tomlConfiguration.Admin.Assets, @@ -293,6 +309,8 @@ func parseTomlConfiguration(filename string) (*Configuration, error) { ProtobufPort: tomlConfiguration.Cluster.ProtobufPort, ProtobufTimeout: tomlConfiguration.Cluster.ProtobufTimeout, ProtobufHeartbeatInterval: tomlConfiguration.Cluster.ProtobufHeartbeatInterval, + ProtobufMinBackoff: tomlConfiguration.Cluster.MinBackoff, + ProtobufMaxBackoff: tomlConfiguration.Cluster.MaxBackoff, SeedServers: tomlConfiguration.Cluster.SeedServers, DataDir: tomlConfiguration.Storage.Dir, LogFile: tomlConfiguration.Logging.File, diff --git a/src/configuration/configuration_test.go b/src/configuration/configuration_test.go index e9eb7591b0..f5a3bec558 100644 --- a/src/configuration/configuration_test.go +++ b/src/configuration/configuration_test.go @@ -46,6 +46,8 @@ func (self *LoadConfigurationSuite) TestConfig(c *C) { c.Assert(config.ProtobufPort, Equals, 8099) c.Assert(config.ProtobufHeartbeatInterval.Duration, Equals, 200*time.Millisecond) + c.Assert(config.ProtobufMinBackoff.Duration, Equals, 100*time.Millisecond) + c.Assert(config.ProtobufMaxBackoff.Duration, Equals, time.Second) c.Assert(config.ProtobufTimeout.Duration, Equals, 2*time.Second) c.Assert(config.SeedServers, DeepEquals, []string{"hosta:8090", "hostb:8090"}) diff --git a/src/coordinator/raft_server.go b/src/coordinator/raft_server.go index c18ccc42e8..7e7294f5c1 100644 --- a/src/coordinator/raft_server.go +++ b/src/coordinator/raft_server.go @@ -371,7 +371,7 @@ func (s *RaftServer) startRaft() error { connectionString, protobufConnectString, nil, - s.config.ProtobufHeartbeatInterval.Duration) + s.config) command := NewAddPotentialServerCommand(clusterServer) _, err = s.doOrProxyCommand(command, "add_server") if err != nil { @@ -614,7 +614,7 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) { command.ConnectionString, command.ProtobufConnectionString, nil, - s.config.ProtobufHeartbeatInterval.Duration) + s.config) addServer := NewAddPotentialServerCommand(clusterServer) if _, err := s.raftServer.Do(addServer); err != nil { log.Error("Error joining raft server: ", err, command)