influxdb/messaging/broker.go

1297 lines
31 KiB
Go
Raw Normal View History

2014-10-21 02:42:03 +00:00
package messaging
2014-10-03 03:13:42 +00:00
import (
"bufio"
2014-10-03 03:13:42 +00:00
"encoding/binary"
2014-10-04 17:27:12 +00:00
"encoding/json"
2014-10-03 03:13:42 +00:00
"fmt"
"io"
"log"
"net/url"
2014-10-03 03:13:42 +00:00
"os"
"path/filepath"
2014-10-12 18:05:03 +00:00
"sort"
2014-10-24 04:22:52 +00:00
"strconv"
2014-10-03 03:13:42 +00:00
"sync"
"github.com/influxdb/influxdb/raft"
)
2015-02-25 03:32:20 +00:00
const (
// BroadcastTopicID is the topic used to communicate with all replicas.
BroadcastTopicID = uint64(0)
// MaxSegmentSize represents the largest size a segment can be before a
// new segment is started.
MaxSegmentSize = 10 * 1024 * 1024 // 10MB
)
2014-10-12 18:05:03 +00:00
2014-10-03 03:13:42 +00:00
// Broker represents distributed messaging system segmented into topics.
// Each topic represents a linear series of events.
type Broker struct {
mu sync.RWMutex
path string // data directory
index uint64 // highest applied index
log *raft.Log // internal raft log
2014-10-03 03:13:42 +00:00
replicas map[uint64]*Replica // replica by id
topics map[uint64]*topic // topics by id
Logger *log.Logger
2014-10-03 03:13:42 +00:00
}
2014-10-21 02:42:03 +00:00
// NewBroker returns a new instance of a Broker with default values.
func NewBroker() *Broker {
2014-10-03 03:13:42 +00:00
b := &Broker{
2014-10-24 04:22:52 +00:00
log: raft.NewLog(),
replicas: make(map[uint64]*Replica),
topics: make(map[uint64]*topic),
Logger: log.New(os.Stderr, "[broker] ", log.LstdFlags),
2014-10-03 03:13:42 +00:00
}
b.log.FSM = (*brokerFSM)(b)
return b
}
// Path returns the path used when opening the broker.
// Returns empty string if the broker is not open.
func (b *Broker) Path() string { return b.path }
2015-02-22 09:13:34 +00:00
2015-02-25 03:32:20 +00:00
// Log returns the underlying raft log.
2015-02-22 09:13:34 +00:00
func (b *Broker) Log() *raft.Log { return b.log }
2014-10-03 03:13:42 +00:00
2015-02-25 03:32:20 +00:00
// metaPath returns the file path to the broker's metadata file.
2015-01-28 06:09:50 +00:00
func (b *Broker) metaPath() string {
if b.path == "" {
return ""
}
return filepath.Join(b.path, "meta")
}
2015-02-21 22:25:11 +00:00
// Index returns the highest index seen by the broker across all topics.
// Returns 0 if the broker is closed.
func (b *Broker) Index() uint64 {
b.mu.Lock()
2015-02-25 03:32:20 +00:00
defer b.mu.Unlock()
return b.index
}
2015-02-25 03:32:20 +00:00
// opened returns true if the broker is in an open and running state.
2014-10-03 03:13:42 +00:00
func (b *Broker) opened() bool { return b.path != "" }
2015-01-29 23:07:58 +00:00
// SetLogOutput sets writer for all Broker log output.
func (b *Broker) SetLogOutput(w io.Writer) {
b.Logger = log.New(w, "[broker] ", log.LstdFlags)
b.log.SetLogOutput(w)
}
2014-10-03 03:13:42 +00:00
// Open initializes the log.
// The broker then must be initialized or join a cluster before it can be used.
func (b *Broker) Open(path string, u *url.URL) error {
2014-10-03 03:13:42 +00:00
b.mu.Lock()
defer b.mu.Unlock()
// Require a non-blank path.
if path == "" {
2014-10-12 18:05:03 +00:00
return ErrPathRequired
2014-10-03 03:13:42 +00:00
}
b.path = path
// Require a non-blank connection address.
if u == nil {
return ErrConnectionAddressRequired
}
2015-01-28 06:09:50 +00:00
// Read meta data from snapshot.
if err := b.load(); err != nil {
_ = b.close()
return err
}
// Open underlying raft log.
2014-10-03 03:13:42 +00:00
if err := b.log.Open(filepath.Join(path, "raft")); err != nil {
2015-02-25 03:32:20 +00:00
_ = b.close()
2014-10-03 03:13:42 +00:00
return fmt.Errorf("raft: %s", err)
}
// Copy connection URL.
b.log.URL = &url.URL{}
*b.log.URL = *u
2014-10-03 03:13:42 +00:00
return nil
}
// Close closes the broker and all topics.
func (b *Broker) Close() error {
b.mu.Lock()
defer b.mu.Unlock()
2015-01-28 06:09:50 +00:00
return b.close()
}
2014-10-03 03:13:42 +00:00
2015-01-28 06:09:50 +00:00
func (b *Broker) close() error {
2014-10-03 03:13:42 +00:00
// Return error if the broker is already closed.
if !b.opened() {
2014-10-12 18:05:03 +00:00
return ErrClosed
2014-10-03 03:13:42 +00:00
}
b.path = ""
// Close all topics & replicas.
b.closeTopics()
b.closeReplicas()
2014-10-17 04:11:28 +00:00
2014-10-03 03:13:42 +00:00
// Close raft log.
_ = b.log.Close()
return nil
}
// closeTopics closes all topic files and clears the topics map.
func (b *Broker) closeTopics() {
for _, t := range b.topics {
_ = t.Close()
}
b.topics = make(map[uint64]*topic)
}
// closeReplicas closes all replica writers and clears the replica map.
func (b *Broker) closeReplicas() {
for _, r := range b.replicas {
r.closeWriter()
}
b.replicas = make(map[uint64]*Replica)
}
2015-01-28 06:09:50 +00:00
// load reads the broker metadata from disk.
func (b *Broker) load() error {
// Read snapshot header from disk.
// Ignore if no snapshot exists.
f, err := os.Open(b.metaPath())
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
defer func() { _ = f.Close() }()
// Read snapshot header from disk.
hdr := &snapshotHeader{}
if err := json.NewDecoder(f).Decode(&hdr); err != nil {
return err
}
// Copy topic files from snapshot to local disk.
for _, st := range hdr.Topics {
2015-02-25 03:32:20 +00:00
t := b.newTopic(st.ID)
2015-01-28 06:09:50 +00:00
t.index = st.Index
// Open new empty topic file.
if err := t.open(); err != nil {
return fmt.Errorf("open topic: %s", err)
}
}
// Update the replicas.
for _, sr := range hdr.Replicas {
// Create replica.
r := newReplica(b, sr.ID, sr.URL)
2015-01-28 06:09:50 +00:00
b.replicas[r.id] = r
// Append replica's topics.
for _, srt := range sr.Topics {
r.topics[srt.TopicID] = srt.Index
}
}
// Read the highest index from each of the topic files.
if err := b.loadIndex(); err != nil {
return fmt.Errorf("load index: %s", err)
}
return nil
}
2015-01-28 06:09:50 +00:00
// loadIndex reads through all topics to find the highest known index.
func (b *Broker) loadIndex() error {
for _, t := range b.topics {
if err := t.loadIndex(); err != nil {
return fmt.Errorf("topic(%d): %s", t.id, err)
} else if t.index > b.index {
b.index = t.index
}
}
2015-01-28 06:09:50 +00:00
return nil
}
// save persists the broker metadata to disk.
func (b *Broker) save() error {
if b.path == "" {
return ErrClosed
2015-01-28 06:09:50 +00:00
}
// Calculate header under lock.
hdr, err := b.createSnapshotHeader()
if err != nil {
return fmt.Errorf("create snapshot: %s", err)
}
// Write snapshot to disk.
f, err := os.Create(b.metaPath())
if err != nil {
return err
}
defer func() { _ = f.Close() }()
// Write snapshot to disk.
if err := json.NewEncoder(f).Encode(&hdr); err != nil {
return err
}
return nil
}
// mustSave persists the broker metadata to disk. Panic on error.
func (b *Broker) mustSave() {
if err := b.save(); err != nil && err != ErrClosed {
2015-01-28 06:09:50 +00:00
panic(err.Error())
}
}
// createSnapshotHeader creates a snapshot header.
func (b *Broker) createSnapshotHeader() (*snapshotHeader, error) {
// Create parent header.
s := &snapshotHeader{}
// Append topics.
for _, t := range b.topics {
// Retrieve current topic file size.
var sz int64
if t.file != nil {
fi, err := t.file.Stat()
if err != nil {
return nil, err
}
sz = fi.Size()
}
// Append topic to the snapshot.
s.Topics = append(s.Topics, &snapshotTopic{
ID: t.id,
Index: t.index,
Size: sz,
path: t.path,
})
}
// Append replicas and the current index for each topic.
for _, r := range b.replicas {
sr := &snapshotReplica{ID: r.id, URL: r.URL.String()}
2015-01-28 06:09:50 +00:00
for topicID, index := range r.topics {
sr.Topics = append(sr.Topics, &snapshotReplicaTopic{
TopicID: topicID,
Index: index,
})
}
s.Replicas = append(s.Replicas, sr)
}
return s, nil
}
2014-12-31 19:42:53 +00:00
// URL returns the connection url for the broker.
func (b *Broker) URL() *url.URL {
return b.log.URL
}
2015-01-28 06:09:50 +00:00
// LeaderURL returns the connection url for the leader broker.
func (b *Broker) LeaderURL() *url.URL {
_, u := b.log.Leader()
return u
}
// IsLeader returns true if the broker is the current leader.
func (b *Broker) IsLeader() bool { return b.log.State() == raft.Leader }
2014-10-03 03:13:42 +00:00
// Initialize creates a new cluster.
func (b *Broker) Initialize() error {
if err := b.log.Initialize(); err != nil {
return fmt.Errorf("raft: %s", err)
}
return nil
}
// Join joins an existing cluster.
func (b *Broker) Join(u *url.URL) error {
if err := b.log.Join(u); err != nil {
return fmt.Errorf("raft: %s", err)
}
return nil
}
2014-10-24 04:22:52 +00:00
// Publish writes a message.
2014-10-03 03:13:42 +00:00
// Returns the index of the message. Otherwise returns an error.
2014-10-24 04:22:52 +00:00
func (b *Broker) Publish(m *Message) (uint64, error) {
2014-10-17 15:53:10 +00:00
buf, _ := m.MarshalBinary()
return b.log.Apply(buf)
2014-10-12 18:05:03 +00:00
}
2014-10-24 04:22:52 +00:00
// PublishSync writes a message and waits until the change is applied.
func (b *Broker) PublishSync(m *Message) error {
// Publish message.
index, err := b.Publish(m)
2014-10-12 18:05:03 +00:00
if err != nil {
return err
}
2014-10-24 04:22:52 +00:00
// Wait for message to apply.
if err := b.Sync(index); err != nil {
return err
}
return nil
2014-10-03 03:13:42 +00:00
}
2014-10-24 04:22:52 +00:00
// Sync pauses until the given index has been applied.
2015-02-25 03:32:20 +00:00
func (b *Broker) Sync(index uint64) error { return b.log.Wait(index) }
2014-10-03 03:13:42 +00:00
// Replica returns a replica by id.
func (b *Broker) Replica(id uint64) *Replica {
2014-10-12 18:05:03 +00:00
b.mu.RLock()
defer b.mu.RUnlock()
return b.replicas[id]
2014-10-04 17:27:12 +00:00
}
// Replicas returns a list of the replicas in the system
func (b *Broker) Replicas() []*Replica {
b.mu.RLock()
defer b.mu.RUnlock()
a := make([]*Replica, 0, len(b.replicas))
for _, r := range b.replicas {
a = append(a, r)
}
sort.Sort(replicas(a))
return a
}
2014-10-12 18:05:03 +00:00
// initializes a new topic object.
2015-02-25 03:32:20 +00:00
func (b *Broker) newTopic(id uint64) *topic {
2014-10-12 18:05:03 +00:00
t := &topic{
id: id,
2014-10-24 04:22:52 +00:00
path: filepath.Join(b.path, strconv.FormatUint(uint64(id), 10)),
replicas: make(map[uint64]*Replica),
2014-10-12 18:05:03 +00:00
}
b.topics[t.id] = t
2014-10-24 04:22:52 +00:00
return t
2014-10-12 18:05:03 +00:00
}
func (b *Broker) createTopicIfNotExists(id uint64) *topic {
2014-10-24 04:22:52 +00:00
if t := b.topics[id]; t != nil {
return t
2014-10-12 18:05:03 +00:00
}
2015-01-28 08:18:35 +00:00
2015-02-25 03:32:20 +00:00
t := b.newTopic(id)
2015-01-28 08:18:35 +00:00
b.mustSave()
return t
2014-10-12 18:05:03 +00:00
}
// CreateReplica creates a new named replica.
func (b *Broker) CreateReplica(id uint64, connectURL *url.URL) error {
2014-10-12 18:05:03 +00:00
b.mu.Lock()
defer b.mu.Unlock()
2014-10-04 17:27:12 +00:00
// Ensure replica doesn't already exist.
s := b.replicas[id]
2014-10-12 18:05:03 +00:00
if s != nil {
2014-10-15 03:42:40 +00:00
return ErrReplicaExists
2014-10-04 17:27:12 +00:00
}
// Add command to create replica.
2014-10-24 04:22:52 +00:00
return b.PublishSync(&Message{
Type: CreateReplicaMessageType,
Data: mustMarshalJSON(&CreateReplicaCommand{ID: id, URL: connectURL.String()}),
2014-10-12 18:05:03 +00:00
})
2014-10-04 17:27:12 +00:00
}
func (b *Broker) mustApplyCreateReplica(m *Message) {
2014-10-24 04:22:52 +00:00
var c CreateReplicaCommand
2014-11-05 05:32:17 +00:00
mustUnmarshalJSON(m.Data, &c)
2014-10-24 04:22:52 +00:00
// Create replica.
r := newReplica(b, c.ID, c.URL)
2014-10-12 18:05:03 +00:00
// Automatically subscribe to the config topic.
2014-10-24 04:22:52 +00:00
t := b.createTopicIfNotExists(BroadcastTopicID)
r.topics[BroadcastTopicID] = t.index
2014-10-12 18:05:03 +00:00
// Add replica to the broker.
b.replicas[c.ID] = r
2015-01-28 06:09:50 +00:00
b.mustSave()
2014-10-04 17:27:12 +00:00
}
// DeleteReplica deletes an existing replica by id.
func (b *Broker) DeleteReplica(id uint64) error {
2014-10-12 18:05:03 +00:00
b.mu.Lock()
defer b.mu.Unlock()
// Ensure replica exists.
if s := b.replicas[id]; s == nil {
2014-10-15 03:42:40 +00:00
return ErrReplicaNotFound
2014-10-03 03:13:42 +00:00
}
// Issue command to remove replica.
2014-10-24 04:22:52 +00:00
return b.PublishSync(&Message{
Type: DeleteReplicaMessageType,
Data: mustMarshalJSON(&DeleteReplicaCommand{ID: id}),
2014-10-12 18:05:03 +00:00
})
}
2014-10-03 03:13:42 +00:00
func (b *Broker) mustApplyDeleteReplica(m *Message) {
2014-10-24 04:22:52 +00:00
var c DeleteReplicaCommand
2014-11-05 05:32:17 +00:00
mustUnmarshalJSON(m.Data, &c)
2014-10-24 04:22:52 +00:00
2014-10-15 03:42:40 +00:00
// Find replica.
r := b.replicas[c.ID]
2014-10-24 04:22:52 +00:00
if r == nil {
return
}
2014-10-15 03:42:40 +00:00
// Remove replica from all subscribed topics.
2014-10-24 04:22:52 +00:00
for topicID := range r.topics {
if t := b.topics[topicID]; t != nil {
delete(t.replicas, r.id)
2014-10-15 03:42:40 +00:00
}
}
r.topics = make(map[uint64]uint64)
2014-10-15 03:42:40 +00:00
// Close replica's writer.
r.closeWriter()
// Remove replica from broker.
delete(b.replicas, c.ID)
2015-01-28 06:09:50 +00:00
b.mustSave()
2014-10-12 18:05:03 +00:00
}
2014-10-03 03:13:42 +00:00
// Subscribe adds a subscription to a topic from a replica.
func (b *Broker) Subscribe(replicaID, topicID uint64) error {
2014-10-12 18:05:03 +00:00
b.mu.Lock()
defer b.mu.Unlock()
// TODO: Allow non-zero starting index.
// Ensure replica & topic exist.
if b.replicas[replicaID] == nil {
2014-10-15 03:42:40 +00:00
return ErrReplicaNotFound
2014-10-03 03:13:42 +00:00
}
2014-10-12 18:05:03 +00:00
// Issue command to subscribe to topic.
2014-10-24 04:22:52 +00:00
return b.PublishSync(&Message{
2014-10-12 18:05:03 +00:00
Type: SubscribeMessageType,
Data: mustMarshalJSON(&SubscribeCommand{ReplicaID: replicaID, TopicID: topicID}),
2014-10-12 18:05:03 +00:00
})
}
func (b *Broker) mustApplySubscribe(m *Message) {
2014-10-24 04:22:52 +00:00
var c SubscribeCommand
2014-11-05 05:32:17 +00:00
mustUnmarshalJSON(m.Data, &c)
2014-10-12 18:05:03 +00:00
2014-10-24 04:22:52 +00:00
// Retrieve replica.
r := b.replicas[c.ReplicaID]
2014-10-24 04:22:52 +00:00
if r == nil {
return
}
2014-10-12 18:05:03 +00:00
// Save current index on topic.
2014-10-24 04:22:52 +00:00
t := b.createTopicIfNotExists(c.TopicID)
2014-10-12 18:05:03 +00:00
// Ensure topic is not already subscribed to.
2014-10-24 04:22:52 +00:00
if _, ok := r.topics[c.TopicID]; ok {
b.Logger.Printf("already subscribed to topic: replica=%d, topic=%d", r.id, c.TopicID)
2014-10-24 04:22:52 +00:00
return
2014-10-03 03:13:42 +00:00
}
// Add subscription to replica.
r.topics[c.TopicID] = c.Index
t.replicas[c.ReplicaID] = r
2014-10-12 18:05:03 +00:00
// Catch up replica.
2015-02-25 03:32:20 +00:00
_ = t.writeTo(r, c.Index)
2015-01-28 06:09:50 +00:00
b.mustSave()
2014-10-12 18:05:03 +00:00
}
// Unsubscribe removes a subscription for a topic from a replica.
func (b *Broker) Unsubscribe(replicaID, topicID uint64) error {
2014-10-12 18:05:03 +00:00
b.mu.Lock()
defer b.mu.Unlock()
// Ensure replica & topic exist.
if b.replicas[replicaID] == nil {
2014-10-15 03:42:40 +00:00
return ErrReplicaNotFound
2014-10-12 18:05:03 +00:00
}
// Issue command to unsubscribe from topic.
2014-10-24 04:22:52 +00:00
return b.PublishSync(&Message{
2014-10-12 18:05:03 +00:00
Type: UnsubscribeMessageType,
Data: mustMarshalJSON(&UnsubscribeCommand{ReplicaID: replicaID, TopicID: topicID}),
2014-10-12 18:05:03 +00:00
})
}
func (b *Broker) mustApplyUnsubscribe(m *Message) {
2014-10-24 04:22:52 +00:00
var c UnsubscribeCommand
2014-11-05 05:32:17 +00:00
mustUnmarshalJSON(m.Data, &c)
2014-10-12 18:05:03 +00:00
2014-10-24 04:22:52 +00:00
// Remove topic from replica.
if r := b.replicas[c.ReplicaID]; r != nil {
2014-10-24 04:22:52 +00:00
delete(r.topics, c.TopicID)
}
2014-10-12 18:05:03 +00:00
2014-10-24 04:22:52 +00:00
// Remove replica from topic.
if t := b.topics[c.TopicID]; t != nil {
delete(t.replicas, c.ReplicaID)
2014-10-24 04:22:52 +00:00
}
2015-01-28 06:09:50 +00:00
b.mustSave()
2014-10-03 03:13:42 +00:00
}
// brokerFSM implements the raft.FSM interface for the broker.
// This is implemented as a separate type because it is not meant to be exported.
type brokerFSM Broker
// MustApply executes a raft log entry against the broker.
// Non-repeatable errors such as system or disk errors must panic.
func (fsm *brokerFSM) MustApply(e *raft.LogEntry) {
2014-10-03 03:13:42 +00:00
b := (*Broker)(fsm)
// Create a message with the same index as Raft.
2014-10-17 15:53:10 +00:00
m := &Message{}
// Decode commands into messages.
// Convert internal raft entries to no-ops to move the index forward.
if e.Type == raft.LogEntryCommand {
// Decode the message from the raft log.
err := m.UnmarshalBinary(e.Data)
assert(err == nil, "message unmarshal: %s", err)
// Update the broker configuration.
switch m.Type {
case CreateReplicaMessageType:
b.mustApplyCreateReplica(m)
case DeleteReplicaMessageType:
b.mustApplyDeleteReplica(m)
case SubscribeMessageType:
b.mustApplySubscribe(m)
case UnsubscribeMessageType:
b.mustApplyUnsubscribe(m)
}
} else {
// Internal raft commands should be broadcast out as no-ops.
m.TopicID = BroadcastTopicID
m.Type = InternalMessageType
2014-10-03 03:13:42 +00:00
}
// Set the raft index.
m.Index = e.Index
2014-10-12 18:05:03 +00:00
// Write to the topic.
2014-10-24 04:22:52 +00:00
t := b.createTopicIfNotExists(m.TopicID)
2014-10-12 18:05:03 +00:00
if err := t.encode(m); err != nil {
panic("encode: " + err.Error())
2014-10-04 17:27:12 +00:00
}
2015-01-10 16:08:00 +00:00
// Save highest applied index.
b.index = e.Index
2014-10-04 17:27:12 +00:00
}
2014-10-03 03:13:42 +00:00
// Index returns the highest index that the broker has seen.
func (fsm *brokerFSM) Index() (uint64, error) {
b := (*Broker)(fsm)
return b.index, nil
2014-10-03 03:13:42 +00:00
}
// Snapshot streams the current state of the broker and returns the index.
func (fsm *brokerFSM) Snapshot(w io.Writer) (uint64, error) {
b := (*Broker)(fsm)
2014-10-03 03:13:42 +00:00
// TODO: Prevent truncation during snapshot.
// Calculate header under lock.
b.mu.RLock()
2015-01-28 06:09:50 +00:00
hdr, err := b.createSnapshotHeader()
b.mu.RUnlock()
if err != nil {
return 0, fmt.Errorf("create snapshot: %s", err)
}
// Encode snapshot header.
2015-01-10 16:08:00 +00:00
buf, err := json.Marshal(&hdr)
if err != nil {
return 0, fmt.Errorf("encode snapshot header: %s", err)
}
// Write header frame.
if err := binary.Write(w, binary.BigEndian, uint32(len(buf))); err != nil {
return 0, fmt.Errorf("write header size: %s", err)
}
if _, err := w.Write(buf); err != nil {
return 0, fmt.Errorf("write header: %s", err)
}
// Stream each topic sequentially.
2015-01-10 16:08:00 +00:00
for _, t := range hdr.Topics {
if _, err := copyFileN(w, t.path, t.Size); err != nil {
return 0, err
}
}
2015-01-10 16:08:00 +00:00
// Return the snapshot and its last applied index.
return hdr.maxIndex(), nil
}
2014-10-03 03:13:42 +00:00
// Restore reads the broker state.
func (fsm *brokerFSM) Restore(r io.Reader) error {
b := (*Broker)(fsm)
b.mu.Lock()
defer b.mu.Unlock()
// Read header frame.
var sz uint32
if err := binary.Read(r, binary.BigEndian, &sz); err != nil {
return fmt.Errorf("read header size: %s", err)
}
buf := make([]byte, sz)
if _, err := io.ReadFull(r, buf); err != nil {
return fmt.Errorf("read header: %s", err)
}
// Decode header.
2015-01-10 16:08:00 +00:00
s := &snapshotHeader{}
if err := json.Unmarshal(buf, &s); err != nil {
return fmt.Errorf("decode header: %s", err)
}
// Close any topics and replicas which might be open and clear them out.
b.closeTopics()
b.closeReplicas()
// Copy topic files from snapshot to local disk.
for _, st := range s.Topics {
2015-02-25 03:32:20 +00:00
t := b.newTopic(st.ID)
t.index = st.Index
// Remove existing file if it exists.
if err := os.Remove(t.path); err != nil && !os.IsNotExist(err) {
return err
}
// Open new empty topic file.
if err := t.open(); err != nil {
return fmt.Errorf("open topic: %s", err)
}
// Copy data from snapshot into file.
if _, err := io.CopyN(t.file, r, st.Size); err != nil {
return fmt.Errorf("copy topic: %s", err)
}
}
// Update the replicas.
for _, sr := range s.Replicas {
// Create replica.
r := newReplica(b, sr.ID, sr.URL)
b.replicas[r.id] = r
// Append replica's topics.
for _, srt := range sr.Topics {
r.topics[srt.TopicID] = srt.Index
}
}
2014-10-03 03:13:42 +00:00
return nil
}
// copyFileN copies n bytes from a path to a writer.
func copyFileN(w io.Writer, path string, n int64) (int64, error) {
// Open file for reading.
f, err := os.Open(path)
if err != nil {
return 0, err
}
defer func() { _ = f.Close() }()
// Copy file up to n bytes.
return io.CopyN(w, f, n)
}
2015-01-10 16:08:00 +00:00
// snapshotHeader represents the header of a snapshot.
type snapshotHeader struct {
Replicas []*snapshotReplica `json:"replicas"`
Topics []*snapshotTopic `json:"topics"`
}
2015-01-10 16:08:00 +00:00
// maxIndex returns the highest applied index across all topics.
func (s *snapshotHeader) maxIndex() uint64 {
var idx uint64
for _, t := range s.Topics {
if t.Index > idx {
idx = t.Index
}
}
return idx
}
type snapshotReplica struct {
ID uint64 `json:"id"`
Topics []*snapshotReplicaTopic `json:"topics"`
URL string `json:"url"`
}
type snapshotTopic struct {
ID uint64 `json:"id"`
Index uint64 `json:"index"`
Size int64 `json:"size"`
path string
}
type snapshotReplicaTopic struct {
TopicID uint64 `json:"topicID"`
Index uint64 `json:"index"`
}
// topic represents a single named queue of messages.
2014-10-03 03:13:42 +00:00
// Each topic is identified by a unique path.
2015-02-25 03:32:20 +00:00
//
// Topics write their entries to segmented log files which contain a
// contiguous range of entries. These segments are periodically dropped
// as data is replicated the replicas and the replicas heartbeat back
// a confirmation of receipt.
2014-10-03 03:13:42 +00:00
type topic struct {
2015-02-25 03:32:20 +00:00
id uint64 // unique identifier
index uint64 // highest index written
path string // on-disk path
segments segments // list of available segments
2014-10-12 18:05:03 +00:00
2015-02-25 03:32:20 +00:00
file *os.File // current segment
2014-10-12 18:05:03 +00:00
replicas map[uint64]*Replica // replicas subscribed to topic
2014-10-03 03:13:42 +00:00
}
2015-02-25 03:32:20 +00:00
// segmentPath returns the path to a segment starting with a given log index.
func (t *topic) segmentPath(index uint64) string {
path := t.path
if path == "" {
return ""
}
return filepath.Join(path, strconv.FormatUint(index, 10))
}
2014-10-12 18:05:03 +00:00
// open opens a topic for writing.
func (t *topic) open() error {
2014-10-24 04:22:52 +00:00
assert(t.file == nil, "topic already open: %d", t.id)
2014-10-12 18:05:03 +00:00
// Ensure the parent directory exists.
2015-02-25 03:32:20 +00:00
if err := os.MkdirAll(t.path, 0700); err != nil {
2014-10-12 18:05:03 +00:00
return err
}
2015-02-25 03:32:20 +00:00
// Read available segments.
if err := t.loadSegments(); err != nil {
return fmt.Errorf("read segments: %s", err)
}
// Open the writer on the latest segment.
f, err := os.OpenFile(t.segments.last().path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
2014-10-12 18:05:03 +00:00
if err != nil {
return err
}
t.file = f
2014-10-03 03:13:42 +00:00
return nil
}
2015-02-25 03:32:20 +00:00
// loadSegments reads all available segments for the topic.
// At least one segment will always exist.
func (t *topic) loadSegments() error {
// Open handle to directory.
f, err := os.Open(t.path)
if err != nil {
return err
}
defer func() { _ = f.Close() }()
// Read directory items.
fis, err := f.Readdir(0)
if err != nil {
return err
}
// Create a segment for each file with a numeric name.
var a segments
for _, fi := range fis {
index, err := strconv.ParseUint(fi.Name(), 10, 64)
if err != nil {
continue
}
a = append(a, &segment{
index: index,
path: t.segmentPath(index),
size: fi.Size(),
})
}
sort.Sort(a)
// Create a first segment if one doesn't exist.
if len(a) == 0 {
a = segments{&segment{index: 0, path: t.segmentPath(0), size: 0}}
}
t.segments = a
return nil
}
// close closes the underlying file.
2014-10-12 18:05:03 +00:00
func (t *topic) Close() error {
// Close file.
if t.file != nil {
_ = t.file.Close()
t.file = nil
}
return nil
2014-10-04 17:27:12 +00:00
}
// loadIndex reads the highest available index for a topic from disk.
func (t *topic) loadIndex() error {
// Open topic file for reading.
2015-02-25 03:32:20 +00:00
f, err := os.Open(t.segments.last().path)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
defer func() { _ = f.Close() }()
// Read all messages.
dec := NewMessageDecoder(bufio.NewReader(f))
for {
// Decode message.
var m Message
if err := dec.Decode(&m); err == io.EOF {
return nil
} else if err != nil {
return fmt.Errorf("decode: %s", err)
}
// Update the topic's highest index.
t.index = m.Index
}
}
// writeTo writes the topic to a replica since a given index.
// Returns an error if the starting index is unavailable.
2015-02-25 03:32:20 +00:00
func (t *topic) writeTo(r *Replica, index uint64) error {
2014-10-12 18:05:03 +00:00
// TODO: If index is too old then return an error.
2015-02-25 03:32:20 +00:00
// Loop over each segment and write if it contains entries after index.
segments := t.segments
for i, s := range segments {
// Determine the maximum index in the range.
var next *segment
if i < len(segments)-1 {
next = segments[i+1]
}
// If the index is after the end of the segment then ignore.
if next != nil && index >= next.index {
continue
}
// Otherwise write segment to replica.
if err := t.writeSegmentTo(r, index, s); err != nil {
return fmt.Errorf("write segment(%d/%d): %s", t.id, s.index, err)
}
}
return nil
}
func (t *topic) writeSegmentTo(r *Replica, index uint64, segment *segment) error {
// Open segment for reading.
2014-11-13 05:32:42 +00:00
// If it doesn't exist then just exit immediately.
2015-02-25 03:32:20 +00:00
f, err := os.Open(segment.path)
2014-11-13 05:32:42 +00:00
if os.IsNotExist(err) {
2015-02-25 03:32:20 +00:00
return nil
2014-11-13 05:32:42 +00:00
} else if err != nil {
2015-02-25 03:32:20 +00:00
return err
2014-10-12 18:05:03 +00:00
}
defer func() { _ = f.Close() }()
// Stream out all messages until EOF.
dec := NewMessageDecoder(bufio.NewReader(f))
2014-10-12 18:05:03 +00:00
for {
// Decode message.
var m Message
if err := dec.Decode(&m); err == io.EOF {
break
} else if err != nil {
2015-02-25 03:32:20 +00:00
return fmt.Errorf("decode: %s", err)
2014-10-12 18:05:03 +00:00
}
// Ignore message if it's on or before high water mark.
if m.Index <= index {
continue
}
// Write message out to stream.
2015-02-25 03:32:20 +00:00
_, err := m.WriteTo(r)
if err != nil {
2015-02-25 03:32:20 +00:00
return fmt.Errorf("write to: %s", err)
2014-10-12 18:05:03 +00:00
}
}
2015-02-25 03:32:20 +00:00
return nil
2014-10-12 18:05:03 +00:00
}
// encode writes a message to the end of the topic.
func (t *topic) encode(m *Message) error {
// Ensure the topic is open and ready for writing.
if t.file == nil {
if err := t.open(); err != nil {
return fmt.Errorf("open: %s", err)
}
}
// Ensure message is in-order.
assert(m.Index > t.index, "topic message out of order: %d -> %d", t.index, m.Index)
// Encode message.
b := make([]byte, messageHeaderSize+len(m.Data))
2014-10-17 15:53:10 +00:00
copy(b, m.marshalHeader())
2014-10-12 18:05:03 +00:00
copy(b[messageHeaderSize:], m.Data)
// Write to topic file.
if _, err := t.file.Write(b); err != nil {
return fmt.Errorf("encode header: %s", err)
}
// Move up high water mark on the topic.
t.index = m.Index
// Write message out to all replicas.
for _, r := range t.replicas {
_, _ = r.Write(b)
2014-10-12 18:05:03 +00:00
}
2014-10-04 17:27:12 +00:00
return nil
}
2015-02-25 03:32:20 +00:00
// segment represents a contiguous section of a topic log.
type segment struct {
index uint64 // starting index of the segment and name
path string // path to the segment file.
size int64 // total size of the segment file, in bytes.
}
// segments represents a list of segments sorted by index.
type segments []*segment
func (a segments) last() *segment { return a[len(a)-1] }
func (a segments) Len() int { return len(a) }
func (a segments) Less(i, j int) bool { return a[i].index < a[j].index }
func (a segments) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// replicas represents a sortable list of replicas.
type replicas []*Replica
func (a replicas) Len() int { return len(a) }
func (a replicas) Less(i, j int) bool { return a[i].id < a[j].id }
func (a replicas) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// Replica represents a collection of subscriptions to topics on the broker.
// The replica maintains the highest index read for each topic so that the
2014-10-12 18:05:03 +00:00
// broker can use this high water mark for trimming the topic logs.
type Replica struct {
URL *url.URL
id uint64
2014-10-12 18:05:03 +00:00
broker *Broker
writer io.Writer // currently attached writer
done chan struct{} // notify when current writer is removed
2014-10-12 18:05:03 +00:00
topics map[uint64]uint64 // current index for each subscribed topic
2014-10-24 04:22:52 +00:00
}
// newReplica returns a new Replica instance associated with a broker.
func newReplica(b *Broker, id uint64, urlstr string) *Replica {
// get the url of the replica
u, err := url.Parse(urlstr)
if err != nil {
panic(err.Error())
}
2014-10-24 04:22:52 +00:00
return &Replica{
URL: u,
2014-10-24 04:22:52 +00:00
broker: b,
id: id,
topics: make(map[uint64]uint64),
2014-10-24 04:22:52 +00:00
}
2014-10-12 18:05:03 +00:00
}
// closeWriter removes the writer on the replica and closes the notify channel.
func (r *Replica) closeWriter() {
if r.writer != nil {
r.writer = nil
close(r.done)
r.done = nil
}
}
2014-10-15 03:42:40 +00:00
// Topics returns a list of topic names that the replica is subscribed to.
func (r *Replica) Topics() []uint64 {
a := make([]uint64, 0, len(r.topics))
2014-10-24 04:22:52 +00:00
for topicID := range r.topics {
a = append(a, topicID)
2014-10-15 03:42:40 +00:00
}
sort.Sort(uint64Slice(a))
2014-10-15 03:42:40 +00:00
return a
}
// Write writes a byte slice to the underlying writer.
// If no writer is available then ErrReplicaUnavailable is returned.
func (r *Replica) Write(p []byte) (int, error) {
// Check if there's a replica available.
if r.writer == nil {
return 0, errReplicaUnavailable
}
2014-10-12 18:05:03 +00:00
// If an error occurs on the write then remove the writer.
n, err := r.writer.Write(p)
if err != nil {
r.closeWriter()
return n, errReplicaUnavailable
2014-10-04 17:27:12 +00:00
}
2014-10-17 04:11:28 +00:00
// If the writer has a flush method then call it.
if w, ok := r.writer.(flusher); ok {
w.Flush()
}
return n, nil
}
// WriteTo begins writing messages to a named stream.
// Only one writer is allowed on a stream at a time.
func (r *Replica) WriteTo(w io.Writer) (int64, error) {
// Close previous writer, if set.
r.closeWriter()
// Set a new writer on the replica.
r.writer = w
done := make(chan struct{})
r.done = done
2014-10-12 18:05:03 +00:00
// Create a topic list with the "config" topic first.
// Configuration changes need to be propagated to make sure topics exist.
ids := make([]uint64, 0, len(r.topics))
2014-10-24 04:22:52 +00:00
for topicID := range r.topics {
ids = append(ids, topicID)
2014-10-12 18:05:03 +00:00
}
sort.Sort(uint64Slice(ids))
2014-10-12 18:05:03 +00:00
// Catch up and attach replica to all subscribed topics.
2014-10-24 04:22:52 +00:00
for _, topicID := range ids {
// Find topic.
2014-10-24 04:22:52 +00:00
t := r.broker.topics[topicID]
assert(t != nil, "topic missing: %s", topicID)
// Write topic messages from last known index.
// Replica machine can ignore messages it already seen.
2014-10-24 04:22:52 +00:00
index := r.topics[topicID]
2015-02-25 03:32:20 +00:00
if err := t.writeTo(r, index); err != nil {
2014-11-13 05:32:42 +00:00
r.closeWriter()
2014-10-12 18:05:03 +00:00
return 0, fmt.Errorf("add stream writer: %s", err)
}
// Attach replica to topic to tail new messages.
t.replicas[r.id] = r
2014-10-12 18:05:03 +00:00
}
2014-10-04 17:27:12 +00:00
// Wait for writer to close and then return.
<-done
2014-10-04 17:27:12 +00:00
return 0, nil
}
// CreateReplica creates a new replica.
type CreateReplicaCommand struct {
ID uint64 `json:"id"`
URL string `json:"url"`
2014-10-12 18:05:03 +00:00
}
// DeleteReplicaCommand removes a replica.
type DeleteReplicaCommand struct {
ID uint64 `json:"id"`
2014-10-12 18:05:03 +00:00
}
// SubscribeCommand subscribes a replica to a new topic.
2014-10-12 18:05:03 +00:00
type SubscribeCommand struct {
ReplicaID uint64 `json:"replicaID"` // replica id
TopicID uint64 `json:"topicID"` // topic id
Index uint64 `json:"index"` // index
2014-10-12 18:05:03 +00:00
}
// UnsubscribeCommand removes a subscription for a topic from a replica.
2014-10-12 18:05:03 +00:00
type UnsubscribeCommand struct {
ReplicaID uint64 `json:"replicaID"` // replica id
TopicID uint64 `json:"topicID"` // topic id
2014-10-12 18:05:03 +00:00
}
2014-10-15 03:42:40 +00:00
// MessageType represents the type of message.
type MessageType uint16
const (
2014-10-24 04:22:52 +00:00
BrokerMessageType = 0x8000
2014-10-15 03:42:40 +00:00
)
const (
InternalMessageType = BrokerMessageType | MessageType(0x00)
CreateReplicaMessageType = BrokerMessageType | MessageType(0x10)
DeleteReplicaMessageType = BrokerMessageType | MessageType(0x11)
2014-10-15 03:42:40 +00:00
SubscribeMessageType = BrokerMessageType | MessageType(0x20)
UnsubscribeMessageType = BrokerMessageType | MessageType(0x21)
2014-10-15 03:42:40 +00:00
)
// The size of the encoded message header, in bytes.
const messageHeaderSize = 2 + 8 + 8 + 4
2014-10-15 03:42:40 +00:00
// Message represents a single item in a topic.
type Message struct {
2014-10-17 15:53:10 +00:00
Type MessageType
TopicID uint64
2014-10-17 15:53:10 +00:00
Index uint64
Data []byte
2014-10-15 03:42:40 +00:00
}
// WriteTo encodes and writes the message to a writer. Implements io.WriterTo.
func (m *Message) WriteTo(w io.Writer) (n int64, err error) {
2014-10-17 15:53:10 +00:00
if n, err := w.Write(m.marshalHeader()); err != nil {
return int64(n), err
2014-10-15 03:42:40 +00:00
}
if n, err := w.Write(m.Data); err != nil {
return int64(messageHeaderSize + n), err
2014-10-15 03:42:40 +00:00
}
return int64(messageHeaderSize + len(m.Data)), nil
2014-10-15 03:42:40 +00:00
}
2014-10-17 15:53:10 +00:00
// MarshalBinary returns a binary representation of the message.
// This implements encoding.BinaryMarshaler. An error cannot be returned.
func (m *Message) MarshalBinary() ([]byte, error) {
b := make([]byte, messageHeaderSize+len(m.Data))
copy(b, m.marshalHeader())
copy(b[messageHeaderSize:], m.Data)
return b, nil
}
// UnmarshalBinary reads a message from a binary encoded slice.
// This implements encoding.BinaryUnmarshaler.
func (m *Message) UnmarshalBinary(b []byte) error {
m.unmarshalHeader(b)
if len(b[messageHeaderSize:]) < len(m.Data) {
return fmt.Errorf("message data too short: %d < %d", len(b[messageHeaderSize:]), len(m.Data))
}
copy(m.Data, b[messageHeaderSize:])
return nil
}
// marshalHeader returns a byte slice with the message header.
func (m *Message) marshalHeader() []byte {
2014-10-15 03:42:40 +00:00
b := make([]byte, messageHeaderSize)
binary.BigEndian.PutUint16(b[0:2], uint16(m.Type))
binary.BigEndian.PutUint64(b[2:10], m.TopicID)
binary.BigEndian.PutUint64(b[10:18], m.Index)
binary.BigEndian.PutUint32(b[18:22], uint32(len(m.Data)))
2014-10-15 03:42:40 +00:00
return b
}
2014-10-17 15:53:10 +00:00
// unmarshalHeader reads message header data from binary encoded slice.
// The data field is appropriately sized but is not filled.
func (m *Message) unmarshalHeader(b []byte) {
m.Type = MessageType(binary.BigEndian.Uint16(b[0:2]))
m.TopicID = binary.BigEndian.Uint64(b[2:10])
m.Index = binary.BigEndian.Uint64(b[10:18])
m.Data = make([]byte, binary.BigEndian.Uint32(b[18:22]))
2014-10-17 15:53:10 +00:00
}
2014-10-15 03:42:40 +00:00
// MessageDecoder decodes messages from a reader.
type MessageDecoder struct {
r io.Reader
}
// NewMessageDecoder returns a new instance of the MessageDecoder.
func NewMessageDecoder(r io.Reader) *MessageDecoder {
return &MessageDecoder{r: r}
}
// Decode reads a message from the decoder's reader.
func (dec *MessageDecoder) Decode(m *Message) error {
2014-10-17 15:53:10 +00:00
// Read header bytes.
2014-10-15 03:42:40 +00:00
var b [messageHeaderSize]byte
if _, err := io.ReadFull(dec.r, b[:]); err != nil {
return err
}
2014-10-17 15:53:10 +00:00
m.unmarshalHeader(b[:])
2014-10-15 03:42:40 +00:00
// Read data.
2014-10-17 15:53:10 +00:00
if _, err := io.ReadFull(dec.r, m.Data); err != nil {
2014-10-15 03:42:40 +00:00
return err
}
return nil
}
2014-10-17 04:11:28 +00:00
type flusher interface {
Flush()
}
// uint64Slice attaches the methods of Interface to []int, sorting in increasing order.
type uint64Slice []uint64
2014-10-24 04:22:52 +00:00
func (p uint64Slice) Len() int { return len(p) }
func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
2014-10-24 04:22:52 +00:00
2014-11-05 05:32:17 +00:00
// mustMarshalJSON encodes a value to JSON.
2014-10-24 04:22:52 +00:00
// This will panic if an error occurs. This should only be used internally when
// an invalid marshal will cause corruption and a panic is appropriate.
2014-11-05 05:32:17 +00:00
func mustMarshalJSON(v interface{}) []byte {
2014-10-12 18:05:03 +00:00
b, err := json.Marshal(v)
2014-10-24 04:22:52 +00:00
if err != nil {
panic("marshal: " + err.Error())
}
2014-10-12 18:05:03 +00:00
return b
2014-10-04 17:27:12 +00:00
}
2014-11-05 05:32:17 +00:00
// mustUnmarshalJSON decodes a value from JSON.
2014-10-24 04:22:52 +00:00
// This will panic if an error occurs. This should only be used internally when
// an invalid unmarshal will cause corruption and a panic is appropriate.
2014-11-05 05:32:17 +00:00
func mustUnmarshalJSON(b []byte, v interface{}) {
2014-10-24 04:22:52 +00:00
if err := json.Unmarshal(b, v); err != nil {
panic("unmarshal: " + err.Error())
}
}
2014-10-03 03:13:42 +00:00
// assert will panic with a given formatted message if the given condition is false.
func assert(condition bool, msg string, v ...interface{}) {
if !condition {
panic(fmt.Sprintf("assert failed: "+msg, v...))
}
}
2014-10-12 18:05:03 +00:00
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }