Fix restarts and broker redirection.

pull/1391/head
Ben Johnson 2015-01-28 01:09:50 -05:00
parent def5fc4703
commit 3601c3c79f
4 changed files with 228 additions and 66 deletions

View File

@ -50,6 +50,13 @@ func NewBroker() *Broker {
// Returns empty string if the broker is not open.
func (b *Broker) Path() string { return b.path }
func (b *Broker) metaPath() string {
if b.path == "" {
return ""
}
return filepath.Join(b.path, "meta")
}
func (b *Broker) opened() bool { return b.path != "" }
// Open initializes the log.
@ -69,6 +76,12 @@ func (b *Broker) Open(path string, u *url.URL) error {
return ErrConnectionAddressRequired
}
// Read meta data from snapshot.
if err := b.load(); err != nil {
_ = b.close()
return err
}
// Open underlying raft log.
if err := b.log.Open(filepath.Join(path, "raft")); err != nil {
return fmt.Errorf("raft: %s", err)
@ -85,7 +98,10 @@ func (b *Broker) Open(path string, u *url.URL) error {
func (b *Broker) Close() error {
b.mu.Lock()
defer b.mu.Unlock()
return b.close()
}
func (b *Broker) close() error {
// Return error if the broker is already closed.
if !b.opened() {
return ErrClosed
@ -118,11 +134,141 @@ func (b *Broker) closeReplicas() {
b.replicas = make(map[uint64]*Replica)
}
// load reads the broker metadata from disk.
func (b *Broker) load() error {
// Read snapshot header from disk.
// Ignore if no snapshot exists.
f, err := os.Open(b.metaPath())
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
defer func() { _ = f.Close() }()
// Read snapshot header from disk.
hdr := &snapshotHeader{}
if err := json.NewDecoder(f).Decode(&hdr); err != nil {
return err
}
// Copy topic files from snapshot to local disk.
for _, st := range hdr.Topics {
t := b.createTopic(st.ID)
t.index = st.Index
// Open new empty topic file.
if err := t.open(); err != nil {
return fmt.Errorf("open topic: %s", err)
}
}
// Update the replicas.
for _, sr := range hdr.Replicas {
// Create replica.
r := newReplica(b, sr.ID)
b.replicas[r.id] = r
// Append replica's topics.
for _, srt := range sr.Topics {
r.topics[srt.TopicID] = srt.Index
}
}
// Set the broker's index to the last index seen across all topics.
b.index = hdr.maxIndex()
return nil
}
// save persists the broker metadata to disk.
func (b *Broker) save() error {
if b.path == "" {
return fmt.Errorf("broker not open")
}
// Calculate header under lock.
hdr, err := b.createSnapshotHeader()
if err != nil {
return fmt.Errorf("create snapshot: %s", err)
}
// Write snapshot to disk.
f, err := os.Create(b.metaPath())
if err != nil {
return err
}
defer func() { _ = f.Close() }()
// Write snapshot to disk.
if err := json.NewEncoder(f).Encode(&hdr); err != nil {
return err
}
return nil
}
// mustSave persists the broker metadata to disk. Panic on error.
func (b *Broker) mustSave() {
if err := b.save(); err != nil {
panic(err.Error())
}
}
// createSnapshotHeader creates a snapshot header.
func (b *Broker) createSnapshotHeader() (*snapshotHeader, error) {
// Create parent header.
s := &snapshotHeader{}
// Append topics.
for _, t := range b.topics {
// Retrieve current topic file size.
var sz int64
if t.file != nil {
fi, err := t.file.Stat()
if err != nil {
return nil, err
}
sz = fi.Size()
}
// Append topic to the snapshot.
s.Topics = append(s.Topics, &snapshotTopic{
ID: t.id,
Index: t.index,
Size: sz,
path: t.path,
})
}
// Append replicas and the current index for each topic.
for _, r := range b.replicas {
sr := &snapshotReplica{ID: r.id}
for topicID, index := range r.topics {
sr.Topics = append(sr.Topics, &snapshotReplicaTopic{
TopicID: topicID,
Index: index,
})
}
s.Replicas = append(s.Replicas, sr)
}
return s, nil
}
// URL returns the connection url for the broker.
func (b *Broker) URL() *url.URL {
return b.log.URL
}
// LeaderURL returns the connection url for the leader broker.
func (b *Broker) LeaderURL() *url.URL {
_, u := b.log.Leader()
return u
}
// Initialize creates a new cluster.
func (b *Broker) Initialize() error {
if err := b.log.Initialize(); err != nil {
@ -182,6 +328,10 @@ func (b *Broker) createTopic(id uint64) *topic {
replicas: make(map[uint64]*Replica),
}
b.topics[t.id] = t
// Persist to disk.
b.mustSave()
return t
}
@ -223,6 +373,8 @@ func (b *Broker) mustApplyCreateReplica(m *Message) {
// Add replica to the broker.
b.replicas[c.ID] = r
b.mustSave()
}
// DeleteReplica deletes an existing replica by id.
@ -265,6 +417,8 @@ func (b *Broker) mustApplyDeleteReplica(m *Message) {
// Remove replica from broker.
delete(b.replicas, c.ID)
b.mustSave()
}
// Subscribe adds a subscription to a topic from a replica.
@ -310,6 +464,8 @@ func (b *Broker) mustApplySubscribe(m *Message) {
// Catch up replica.
_, _ = t.writeTo(r, index)
b.mustSave()
}
// Unsubscribe removes a subscription for a topic from a replica.
@ -342,6 +498,8 @@ func (b *Broker) mustApplyUnsubscribe(m *Message) {
if t := b.topics[c.TopicID]; t != nil {
delete(t.replicas, c.ReplicaID)
}
b.mustSave()
}
// brokerFSM implements the raft.FSM interface for the broker.
@ -392,6 +550,10 @@ func (fsm *brokerFSM) MustApply(e *raft.LogEntry) {
// Save highest applied index.
// TODO: Persist to disk for raft commands.
b.index = e.Index
// HACK: Persist metadata after each apply.
// This should be derived on startup from the topic logs.
b.mustSave()
}
// Index returns the highest index that the broker has seen.
@ -408,7 +570,7 @@ func (fsm *brokerFSM) Snapshot(w io.Writer) (uint64, error) {
// Calculate header under lock.
b.mu.RLock()
hdr, err := fsm.createSnapshotHeader()
hdr, err := b.createSnapshotHeader()
b.mu.RUnlock()
if err != nil {
return 0, fmt.Errorf("create snapshot: %s", err)
@ -439,47 +601,6 @@ func (fsm *brokerFSM) Snapshot(w io.Writer) (uint64, error) {
return hdr.maxIndex(), nil
}
// createSnapshotHeader creates a snapshot header.
func (fsm *brokerFSM) createSnapshotHeader() (*snapshotHeader, error) {
b := (*Broker)(fsm)
// Create parent header.
s := &snapshotHeader{}
// Append topics.
for _, t := range b.topics {
// Retrieve current topic file size.
fi, err := t.file.Stat()
if err != nil {
return nil, err
}
// Append topic to the snapshot.
s.Topics = append(s.Topics, &snapshotTopic{
ID: t.id,
Index: t.index,
Size: fi.Size(),
path: t.path,
})
}
// Append replicas and the current index for each topic.
for _, r := range b.replicas {
sr := &snapshotReplica{ID: r.id}
for topicID, index := range r.topics {
sr.Topics = append(sr.Topics, &snapshotReplicaTopic{
TopicID: topicID,
Index: index,
})
}
s.Replicas = append(s.Replicas, sr)
}
return s, nil
}
// Restore reads the broker state.
func (fsm *brokerFSM) Restore(r io.Reader) error {
b := (*Broker)(fsm)

View File

@ -173,22 +173,40 @@ func (c *Client) Close() error {
// Publish sends a message to the broker and returns an index or error.
func (c *Client) Publish(m *Message) (uint64, error) {
// Send the message to the messages endpoint.
u := *c.LeaderURL()
u.Path = "/messaging/messages"
u.RawQuery = url.Values{
"type": {strconv.FormatUint(uint64(m.Type), 10)},
"topicID": {strconv.FormatUint(m.TopicID, 10)},
}.Encode()
resp, err := http.Post(u.String(), "application/octet-stream", bytes.NewReader(m.Data))
if err != nil {
return 0, err
}
defer func() { _ = resp.Body.Close() }()
var resp *http.Response
var err error
// If a non-200 status is returned then an error occurred.
if resp.StatusCode != http.StatusOK {
return 0, errors.New(resp.Header.Get("X-Broker-Error"))
u := *c.LeaderURL()
for {
// Send the message to the messages endpoint.
u.Path = "/messaging/messages"
u.RawQuery = url.Values{
"type": {strconv.FormatUint(uint64(m.Type), 10)},
"topicID": {strconv.FormatUint(m.TopicID, 10)},
}.Encode()
resp, err = http.Post(u.String(), "application/octet-stream", bytes.NewReader(m.Data))
if err != nil {
return 0, err
}
defer resp.Body.Close()
// If a temporary redirect occurs then update the leader and retry.
// If a non-200 status is returned then an error occurred.
if resp.StatusCode == http.StatusTemporaryRedirect {
redirectURL, err := url.Parse(resp.Header.Get("Location"))
if err != nil {
return 0, fmt.Errorf("bad redirect: %s", resp.Header.Get("Location"))
}
u = *redirectURL
continue
} else if resp.StatusCode != http.StatusOK {
if errstr := resp.Header.Get("X-Broker-Error"); errstr != "" {
return 0, errors.New(errstr)
}
return 0, fmt.Errorf("cannot publish(%d)", resp.StatusCode)
} else {
break
}
}
// Parse broker index.
@ -329,9 +347,12 @@ func (c *Client) streamFromURL(u *url.URL, done chan chan struct{}) error {
// Ensure that we received a 200 OK from the server before streaming.
if resp.StatusCode != http.StatusOK {
time.Sleep(c.ReconnectTimeout)
c.Logger.Printf("reconnecting to broker: %s (status=%d)", u, resp.StatusCode)
return nil
}
c.Logger.Printf("connected to broker: %s", u)
// Continuously decode messages from request body in a separate goroutine.
errNotify := make(chan error, 0)
go func() {

View File

@ -38,6 +38,8 @@ func (h *Handler) SetBroker(b *Broker) {
// ServeHTTP serves an HTTP request.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// h.broker.Logger.Printf("%s %s", r.Method, r.URL.String())
// Delegate raft requests to its own handler.
if strings.HasPrefix(r.URL.Path, "/raft") {
h.raftHandler.ServeHTTP(w, r)
@ -128,7 +130,17 @@ func (h *Handler) publish(w http.ResponseWriter, r *http.Request) {
// Publish message to the broker.
index, err := h.broker.Publish(m)
if err != nil {
if err == raft.ErrNotLeader {
if u := h.broker.LeaderURL(); u != nil {
redirectURL := *r.URL
redirectURL.Scheme = u.Scheme
redirectURL.Host = u.Host
http.Redirect(w, r, redirectURL.String(), http.StatusTemporaryRedirect)
} else {
h.error(w, err, http.StatusInternalServerError)
}
return
} else if err != nil {
h.error(w, err, http.StatusInternalServerError)
return
}

View File

@ -24,10 +24,10 @@ import (
const (
// DefaultHeartbeatInterval is the default time to wait between heartbeats.
DefaultHeartbeatInterval = 50 * time.Millisecond
DefaultHeartbeatInterval = 150 * time.Millisecond
// DefaultElectionTimeout is the default time before starting an election.
DefaultElectionTimeout = 150 * time.Millisecond
DefaultElectionTimeout = 500 * time.Millisecond
// DefaultReconnectTimeout is the default time to wait before reconnecting.
DefaultReconnectTimeout = 10 * time.Millisecond
@ -247,18 +247,24 @@ func (l *Log) Open(path string) error {
}
l.config = c
// If this log is the only node then promote to leader immediately.
if c != nil && len(c.Nodes) == 1 && c.Nodes[0].ID == l.id {
l.Logger.Println("log open: promoting to leader immediately")
l.setState(Leader)
}
// Determine last applied index from FSM.
index, err := l.FSM.Index()
if err != nil {
return err
}
l.index = index
l.appliedIndex = index
l.commitIndex = index
// If this log is the only node then promote to leader immediately.
// Otherwise if there's any configuration then start it as a follower.
if c != nil && len(c.Nodes) == 1 && c.Nodes[0].ID == l.id {
l.Logger.Println("log open: promoting to leader immediately")
l.setState(Leader)
} else if l.config != nil {
l.setState(Follower)
l.lastContact = l.Clock.Now()
}
// Start goroutine to apply logs.
l.done = append(l.done, make(chan chan struct{}))
@ -554,6 +560,8 @@ func (l *Log) Leave() error {
// setState moves the log to a given state.
func (l *Log) setState(state State) {
l.Logger.Printf("log state change: %s => %s", l.state, state)
// Stop previous state.
if l.ch != nil {
close(l.ch)