just store the raft connection string
parent
12da8803da
commit
b28425235e
|
@ -410,10 +410,6 @@ func (self *Configuration) GraphitePortString() string {
|
|||
return fmt.Sprintf("%s:%d", self.BindAddress, self.GraphitePort)
|
||||
}
|
||||
|
||||
func (self *Configuration) ProtobufPortString() string {
|
||||
return fmt.Sprintf("%s:%d", self.BindAddress, self.ProtobufPort)
|
||||
}
|
||||
|
||||
func (self *Configuration) HostnameOrDetect() string {
|
||||
if self.Hostname != "" {
|
||||
return self.Hostname
|
||||
|
@ -430,3 +426,15 @@ func (self *Configuration) HostnameOrDetect() string {
|
|||
func (self *Configuration) ProtobufConnectionString() string {
|
||||
return fmt.Sprintf("%s:%d", self.HostnameOrDetect(), self.ProtobufPort)
|
||||
}
|
||||
|
||||
func (self *Configuration) RaftConnectionString() string {
|
||||
return fmt.Sprintf("http://%s:%d", self.HostnameOrDetect(), self.RaftServerPort)
|
||||
}
|
||||
|
||||
func (self *Configuration) ProtobufListenString() string {
|
||||
return fmt.Sprintf("%s:%d", self.BindAddress, self.ProtobufPort)
|
||||
}
|
||||
|
||||
func (self *Configuration) RaftListenString() string {
|
||||
return fmt.Sprintf("%s:%d", self.BindAddress, self.RaftServerPort)
|
||||
}
|
||||
|
|
|
@ -34,8 +34,6 @@ const (
|
|||
// server which acts as the transport.
|
||||
type RaftServer struct {
|
||||
name string
|
||||
host string
|
||||
port int
|
||||
path string
|
||||
bind_address string
|
||||
router *mux.Router
|
||||
|
@ -55,6 +53,7 @@ var registeredCommands bool
|
|||
|
||||
// Creates a new server.
|
||||
func NewRaftServer(config *configuration.Configuration, clusterConfig *cluster.ClusterConfiguration) *RaftServer {
|
||||
// raft.SetLogLevel(raft.Debug)
|
||||
if !registeredCommands {
|
||||
registeredCommands = true
|
||||
for _, command := range internalRaftCommands {
|
||||
|
@ -63,10 +62,7 @@ func NewRaftServer(config *configuration.Configuration, clusterConfig *cluster.C
|
|||
}
|
||||
|
||||
s := &RaftServer{
|
||||
host: config.HostnameOrDetect(),
|
||||
port: config.RaftServerPort,
|
||||
path: config.RaftDir,
|
||||
bind_address: config.BindAddress,
|
||||
clusterConfig: clusterConfig,
|
||||
notLeader: make(chan bool, 1),
|
||||
router: mux.NewRouter(),
|
||||
|
@ -328,7 +324,7 @@ func (s *RaftServer) CommittedAllChanges() bool {
|
|||
}
|
||||
|
||||
func (s *RaftServer) startRaft() error {
|
||||
log.Info("Initializing Raft Server: %s %d", s.path, s.port)
|
||||
log.Info("Initializing Raft Server: %s", s.config.RaftConnectionString())
|
||||
|
||||
// Initialize and start Raft server.
|
||||
transporter := raft.NewHTTPTransporter("/raft")
|
||||
|
@ -358,10 +354,9 @@ func (s *RaftServer) startRaft() error {
|
|||
if len(potentialLeaders) == 0 {
|
||||
log.Info("Starting as new Raft leader...")
|
||||
name := s.raftServer.Name()
|
||||
connectionString := s.connectionString()
|
||||
_, err := s.raftServer.Do(&InfluxJoinCommand{
|
||||
Name: name,
|
||||
ConnectionString: connectionString,
|
||||
ConnectionString: s.config.RaftConnectionString(),
|
||||
ProtobufConnectionString: s.config.ProtobufConnectionString(),
|
||||
})
|
||||
|
||||
|
@ -479,7 +474,7 @@ func (s *RaftServer) runContinuousQuery(db string, query *parser.SelectQuery, st
|
|||
}
|
||||
|
||||
func (s *RaftServer) ListenAndServe() error {
|
||||
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", s.bind_address, s.port))
|
||||
l, err := net.Listen("tcp", s.config.RaftListenString())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -487,7 +482,6 @@ func (s *RaftServer) ListenAndServe() error {
|
|||
}
|
||||
|
||||
func (s *RaftServer) Serve(l net.Listener) error {
|
||||
s.port = l.Addr().(*net.TCPAddr).Port
|
||||
s.listener = l
|
||||
|
||||
log.Info("Initializing Raft HTTP server")
|
||||
|
@ -501,7 +495,7 @@ func (s *RaftServer) Serve(l net.Listener) error {
|
|||
s.router.HandleFunc("/join", s.joinHandler).Methods("POST")
|
||||
s.router.HandleFunc("/process_command/{command_type}", s.processCommandHandler).Methods("POST")
|
||||
|
||||
log.Info("Raft Server Listening at %s", s.connectionString())
|
||||
log.Info("Raft Server Listening at %s", s.config.RaftListenString())
|
||||
|
||||
go func() {
|
||||
err := s.httpServer.Serve(l)
|
||||
|
@ -537,7 +531,7 @@ func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter
|
|||
func (s *RaftServer) Join(leader string) error {
|
||||
command := &InfluxJoinCommand{
|
||||
Name: s.raftServer.Name(),
|
||||
ConnectionString: s.connectionString(),
|
||||
ConnectionString: s.config.RaftConnectionString(),
|
||||
ProtobufConnectionString: s.config.ProtobufConnectionString(),
|
||||
}
|
||||
connectUrl := leader
|
||||
|
|
|
@ -52,7 +52,7 @@ func NewServer(config *configuration.Configuration) (*Server, error) {
|
|||
|
||||
coord := coordinator.NewCoordinatorImpl(config, raftServer, clusterConfig)
|
||||
requestHandler := coordinator.NewProtobufRequestHandler(coord, clusterConfig)
|
||||
protobufServer := coordinator.NewProtobufServer(config.ProtobufPortString(), requestHandler)
|
||||
protobufServer := coordinator.NewProtobufServer(config.ProtobufListenString(), requestHandler)
|
||||
|
||||
raftServer.AssignCoordinator(coord)
|
||||
httpApi := http.NewHttpServer(config.ApiHttpPortString(), config.ApiReadTimeout, config.AdminAssetsDir, coord, coord, clusterConfig, raftServer)
|
||||
|
|
Loading…
Reference in New Issue