Add proper broker recovery.

This commit fixes the broker recovery so that it determines the last index
from the various topic logs instead of persisting the snapshot on every
message that comes in.
pull/1667/head
Ben Johnson 2015-02-21 08:21:51 -07:00
parent 64cc1fb0f6
commit a5692b71ee
2 changed files with 113 additions and 7 deletions

View File

@ -57,6 +57,14 @@ func (b *Broker) metaPath() string {
return filepath.Join(b.path, "meta")
}
// Index returns the highest index seen by the broker.
// Returns 0 if the broker is closed.
func (b *Broker) Index() uint64 {
b.mu.Lock()
b.mu.Unlock()
return b.index
}
func (b *Broker) opened() bool { return b.path != "" }
// SetLogOutput sets writer for all Broker log output.
@ -181,12 +189,26 @@ func (b *Broker) load() error {
}
}
// Set the broker's index to the last index seen across all topics.
b.index = hdr.maxIndex()
// Read the highest index from each of the topic files.
if err := b.loadIndex(); err != nil {
return fmt.Errorf("load index: %s", err)
}
return nil
}
// loadIndex reads through all topics to find the highest known index.
func (b *Broker) loadIndex() error {
for _, t := range b.topics {
if err := t.loadIndex(); err != nil {
return fmt.Errorf("topic(%d): %s", t.id, err)
} else if t.index > b.index {
b.index = t.index
}
}
return nil
}
// save persists the broker metadata to disk.
func (b *Broker) save() error {
if b.path == "" {
@ -568,12 +590,7 @@ func (fsm *brokerFSM) MustApply(e *raft.LogEntry) {
}
// Save highest applied index.
// TODO: Persist to disk for raft commands.
b.index = e.Index
// HACK: Persist metadata after each apply.
// This should be derived on startup from the topic logs.
b.mustSave()
}
// Index returns the highest index that the broker has seen.
@ -774,6 +791,33 @@ func (t *topic) Close() error {
return nil
}
// loadIndex reads the highest available index for a topic from disk.
func (t *topic) loadIndex() error {
// Open topic file for reading.
f, err := os.Open(t.path)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
defer func() { _ = f.Close() }()
// Read all messages.
dec := NewMessageDecoder(bufio.NewReader(f))
for {
// Decode message.
var m Message
if err := dec.Decode(&m); err == io.EOF {
return nil
} else if err != nil {
return fmt.Errorf("decode: %s", err)
}
// Update the topic's highest index.
t.index = m.Index
}
}
// writeTo writes the topic to a replica since a given index.
// Returns an error if the starting index is unavailable.
func (t *topic) writeTo(r *Replica, index uint64) (int64, error) {

View File

@ -194,6 +194,31 @@ func TestBroker_Unsubscribe_ErrReplicaNotFound(t *testing.T) {
}
}
// Ensure the broker can reopen and recover correctly.
func TestBroker_Reopen(t *testing.T) {
b := NewBroker(nil)
defer b.Close()
b.MustCreateReplica(2000, &url.URL{Host: "localhost"})
b.MustSubscribe(2000, 20)
b.MustPublishSync(&messaging.Message{TopicID: 20, Data: []byte("0000")})
b.MustPublishSync(&messaging.Message{TopicID: 20, Data: []byte("0000")})
index := b.MustPublishSync(&messaging.Message{TopicID: 20, Data: []byte("0000")})
// Close broker and reopen with a new broker instance.
path, u := b.Path(), b.URL()
b.Broker.Close()
b.Broker = messaging.NewBroker()
if err := b.Broker.Open(path, u); err != nil {
t.Fatal(err)
}
// Verify the broker is up to date.
newIndex := b.Index()
if newIndex != index {
t.Fatalf("index mismatch: exp=%d, got=%d", index, newIndex)
}
}
// Benchmarks a single broker without HTTP.
func BenchmarkBroker_Publish(b *testing.B) {
br := NewBroker(nil)
@ -276,6 +301,43 @@ func (b *Broker) MustReadAll(replicaID uint64) (a []*messaging.Message) {
return
}
// MustCreateReplica creates a new replica. Panic on error.
func (b *Broker) MustCreateReplica(replicaID uint64, u *url.URL) {
if err := b.CreateReplica(replicaID, u); err != nil {
panic(err.Error())
}
}
// MustSubscribe subscribes a replica to a topic. Panic on error.
func (b *Broker) MustSubscribe(replicaID, topicID uint64) {
if err := b.Subscribe(replicaID, topicID); err != nil {
panic(err.Error())
}
}
// MustSync syncs to a broker index. Panic on error.
func (b *Broker) MustSync(index uint64) {
if err := b.Sync(index); err != nil {
panic(err.Error())
}
}
// MustPublish publishes a message to the broker. Panic on error.
func (b *Broker) MustPublish(m *messaging.Message) uint64 {
index, err := b.Publish(&messaging.Message{Type: 100, TopicID: 20, Data: []byte("0000")})
if err != nil {
panic(err.Error())
}
return index
}
// MustPublishSync publishes a message to the broker and syncs to that index. Panic on error.
func (b *Broker) MustPublishSync(m *messaging.Message) uint64 {
index := b.MustPublish(m)
b.MustSync(index)
return index
}
// Messages represents a collection of messages.
// This type provides helper functions.
type Messages []*messaging.Message