Merge pull request #1249 from otoolep/3_node_raft

Enable join of nodes to existing cluster
pull/1247/head
Philip O'Toole 2014-12-19 17:13:50 -05:00
commit 8b55fda793
9 changed files with 83 additions and 27 deletions

View File

@ -162,6 +162,11 @@ func (c *Config) RaftListenAddr() string {
return fmt.Sprintf("%s:%d", c.BindAddress, c.Raft.Port)
}
// RaftConnectionString returns the address required to contact the Raft server
func (c *Config) RaftConnectionString() string {
return fmt.Sprintf("http://%s:%d", c.Hostname, c.Raft.Port)
}
// Size represents a TOML parseable file size.
// Users can specify size using "m" for megabytes and "g" for gigabytes.
type Size int
@ -263,10 +268,6 @@ func (c *Config) ProtobufConnectionString() string {
return fmt.Sprintf("%s:%d", c.Hostname, c.ProtobufPort)
}
func (c *Config) RaftConnectionString() string {
return fmt.Sprintf("http://%s:%d", c.Hostname, c.RaftServerPort)
}
func (c *Config) ProtobufListenString() string {
return fmt.Sprintf("%s:%d", c.BindAddress, c.ProtobufPort)
}

View File

@ -34,7 +34,7 @@ func execCreateCluster(args []string) {
// Create the broker.
b := messaging.NewBroker()
if err := b.Open(config.Raft.Dir); err != nil {
if err := b.Open(config.Raft.Dir, config.RaftConnectionString()); err != nil {
log.Fatalf("broker: %s", err.Error())
}

View File

@ -36,14 +36,40 @@ func execJoinCluster(args []string) {
log.Fatalf("node must join as 'combined', 'broker', or 'data'")
}
var seedURLs []*url.URL
for _, s := range strings.Split(*seedServers, ",") {
u, err := url.Parse(s)
if err != nil {
log.Fatalf("seed server: %s", err)
}
seedURLs = append(seedURLs, u)
}
// If joining as broker then create broker.
if *role == "combined" || *role == "broker" {
// Broker required -- but don't initialize it.
// Joining a cluster will do that.
b := messaging.NewBroker()
if err := b.Open(config.Raft.Dir); err != nil {
if err := b.Open(config.Raft.Dir, config.RaftConnectionString()); err != nil {
log.Fatalf("join: %s", err)
}
// Loop through each, connecting to one must succeed.
joined := false
for _, s := range seedURLs {
err := b.Join(s)
if err != nil {
log.Println("error: join failed to connect to", s, err)
} else {
log.Println("join: connected successfully to", s)
joined = true
break
}
}
if !joined {
log.Fatalf("join: failed to connect to any seed server")
}
}
// If joining as a data node then create a data directory.
@ -57,15 +83,6 @@ func execJoinCluster(args []string) {
}
// Configure the Messaging Client.
var seedURLs []*url.URL
for _, s := range strings.Split(*seedServers, ",") {
u, err := url.Parse(s)
if err != nil {
log.Fatalf("seed server: %s", err)
}
seedURLs = append(seedURLs, u)
}
c := messaging.NewClient("XXX-CHANGEME-XXX")
if err := c.Open(filepath.Join(config.Storage.Dir, messagingClientFile), seedURLs); err != nil {
@ -77,7 +94,7 @@ func execJoinCluster(args []string) {
}
log.Printf("joined cluster at %s", *seedServers)
log.Printf("joined cluster as '%s' at %s", *role, *seedServers)
}
func printJoinClusterUsage() {

View File

@ -81,7 +81,7 @@ func execRun(args []string) {
// If the Broker directory exists, open a Broker on this node.
if brokerDirExists {
b := messaging.NewBroker()
if err := b.Open(config.Raft.Dir); err != nil {
if err := b.Open(config.Raft.Dir, config.RaftConnectionString()); err != nil {
log.Fatalf("failed to open Broker", err.Error())
}
brokerHandler = messaging.NewHandler(b)
@ -110,10 +110,8 @@ func execRun(args []string) {
}
defer c.Close()
client = c
log.Printf("Cluster messaging client created")
} else {
client = messaging.NewLoopbackClient()
log.Printf("Local messaging client created")
}
server = influxdb.NewServer(client)

View File

@ -51,7 +51,7 @@ func (b *Broker) opened() bool { return b.path != "" }
// Open initializes the log.
// The broker then must be initialized or join a cluster before it can be used.
func (b *Broker) Open(path string) error {
func (b *Broker) Open(path string, addr string) error {
b.mu.Lock()
defer b.mu.Unlock()
@ -61,10 +61,20 @@ func (b *Broker) Open(path string) error {
}
b.path = path
// Open underlying raft log.
// Require a non-blank connection address.
if addr == "" {
return ErrConnectionAddressRequired
}
// Open underlying raft log and set its connection URL.
if err := b.log.Open(filepath.Join(path, "raft")); err != nil {
return fmt.Errorf("raft: %s", err)
}
u, err := url.Parse(addr)
if err != nil {
return fmt.Errorf("broker: %s", err)
}
b.log.URL = u
return nil
}
@ -101,6 +111,14 @@ func (b *Broker) Initialize() error {
return nil
}
// Join joins an existing cluster.
func (b *Broker) Join(u *url.URL) error {
if err := b.log.Join(u); err != nil {
return fmt.Errorf("raft: %s", err)
}
return nil
}
// Publish writes a message.
// Returns the index of the message. Otherwise returns an error.
func (b *Broker) Publish(m *Message) (uint64, error) {

View File

@ -16,7 +16,18 @@ import (
// Ensure that opening a broker without a path returns an error.
func TestBroker_Open_ErrPathRequired(t *testing.T) {
b := messaging.NewBroker()
if err := b.Open(""); err != messaging.ErrPathRequired {
if err := b.Open("", "http://127.0.0.1:8080"); err != messaging.ErrPathRequired {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that opening a broker without a connection address returns an error.
func TestBroker_Open_ErrAddressRequired(t *testing.T) {
b := messaging.NewBroker()
f := tempfile()
defer os.Remove(f)
if err := b.Open(f, ""); err != messaging.ErrConnectionAddressRequired {
t.Fatalf("unexpected error: %s", err)
}
}
@ -190,7 +201,7 @@ type Broker struct {
// NewBroker returns a new open tempoarary broker.
func NewBroker() *Broker {
b := messaging.NewBroker()
if err := b.Open(tempfile()); err != nil {
if err := b.Open(tempfile(), "http://127.0.0.1:8080"); err != nil {
panic("open: " + err.Error())
}
if err := b.Initialize(); err != nil {

View File

@ -6,6 +6,9 @@ var (
// ErrPathRequired is returned when opening a broker without a path.
ErrPathRequired = errors.New("path required")
// ErrPathRequired is returned when opening a broker without a connection address.
ErrConnectionAddressRequired = errors.New("connection address required")
// ErrClosed is returned when closing a broker that's already closed.
ErrClosed = errors.New("broker already closed")

View File

@ -248,6 +248,7 @@ func (l *Log) Open(path string) error {
// If this log is the only node then promote to leader immediately.
if c != nil && len(c.Nodes) == 1 && c.Nodes[0].ID == l.id {
l.Logger.Println("log open: promoting to leader immediately")
l.setState(Leader)
}
@ -266,6 +267,9 @@ func (l *Log) Open(path string) error {
l.done = append(l.done, make(chan chan struct{}))
go l.elector(l.done[len(l.done)-1])
l.Logger.Printf("log open: created at %s, with ID %d, term %d, last applied index of %d",
path, l.id, l.term, l.index)
return nil
}
@ -451,6 +455,9 @@ func (l *Log) Initialize() error {
l.term = term
l.setState(Leader)
l.Logger.Printf("log initialize: promoted to 'leader' with cluster ID %d, log ID %d, term %d",
config.ClusterID, l.id, l.term)
return nil
}()
if err != nil {
@ -527,6 +534,7 @@ func (l *Log) Join(u *url.URL) error {
// Change to a follower state.
l.setState(Follower)
l.Logger.Println("log join: entered 'follower' state for cluster at", u, " with log ID", l.id)
return nil
}

View File

@ -94,7 +94,7 @@ type HTTPTransport struct{}
func (t *HTTPTransport) Join(uri *url.URL, nodeURL *url.URL) (uint64, *Config, error) {
// Construct URL.
u := *uri
u.Path = path.Join(u.Path, "join")
u.Path = path.Join(u.Path, "raft/join")
u.RawQuery = (&url.Values{"url": {nodeURL.String()}}).Encode()
// Send HTTP request.
@ -134,7 +134,7 @@ func (t *HTTPTransport) Leave(uri *url.URL, id uint64) error {
func (t *HTTPTransport) Heartbeat(uri *url.URL, term, commitIndex, leaderID uint64) (uint64, uint64, error) {
// Construct URL.
u := *uri
u.Path = path.Join(u.Path, "heartbeat")
u.Path = path.Join(u.Path, "raft/heartbeat")
// Set URL parameters.
v := &url.Values{}
@ -176,7 +176,7 @@ func (t *HTTPTransport) Heartbeat(uri *url.URL, term, commitIndex, leaderID uint
func (t *HTTPTransport) ReadFrom(uri *url.URL, id, term, index uint64) (io.ReadCloser, error) {
// Construct URL.
u := *uri
u.Path = path.Join(u.Path, "stream")
u.Path = path.Join(u.Path, "raft/stream")
// Set URL parameters.
v := &url.Values{}
@ -204,7 +204,7 @@ func (t *HTTPTransport) ReadFrom(uri *url.URL, id, term, index uint64) (io.ReadC
func (t *HTTPTransport) RequestVote(uri *url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error) {
// Construct URL.
u := *uri
u.Path = path.Join(u.Path, "vote")
u.Path = path.Join(u.Path, "raft/vote")
// Set URL parameters.
v := &url.Values{}