2014-10-21 02:42:03 +00:00
|
|
|
package messaging
|
2014-10-03 03:13:42 +00:00
|
|
|
|
|
|
|
import (
|
2014-10-13 23:32:12 +00:00
|
|
|
"bufio"
|
2014-10-03 03:13:42 +00:00
|
|
|
"encoding/binary"
|
2014-10-04 17:27:12 +00:00
|
|
|
"encoding/json"
|
2014-10-03 03:13:42 +00:00
|
|
|
"fmt"
|
|
|
|
"io"
|
2014-10-13 23:32:12 +00:00
|
|
|
"net/url"
|
2014-10-03 03:13:42 +00:00
|
|
|
"os"
|
|
|
|
"path/filepath"
|
2014-10-12 18:05:03 +00:00
|
|
|
"sort"
|
2014-10-24 04:22:52 +00:00
|
|
|
"strconv"
|
2014-10-03 03:13:42 +00:00
|
|
|
"sync"
|
|
|
|
|
|
|
|
"github.com/influxdb/influxdb/raft"
|
|
|
|
)
|
|
|
|
|
2014-11-13 05:32:42 +00:00
|
|
|
// BroadcastTopicID is the topic used to communicate with all replicas.
|
2014-11-10 02:55:53 +00:00
|
|
|
const BroadcastTopicID = uint64(0)
|
2014-10-12 18:05:03 +00:00
|
|
|
|
2014-10-03 03:13:42 +00:00
|
|
|
// Broker represents distributed messaging system segmented into topics.
|
|
|
|
// Each topic represents a linear series of events.
|
|
|
|
type Broker struct {
|
2014-10-12 18:05:03 +00:00
|
|
|
mu sync.RWMutex
|
|
|
|
path string // data directory
|
|
|
|
log *raft.Log // internal raft log
|
2014-10-03 03:13:42 +00:00
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
replicas map[string]*Replica // replica by name
|
2014-10-12 18:05:03 +00:00
|
|
|
|
2014-11-10 02:55:53 +00:00
|
|
|
maxTopicID uint64 // autoincrementing sequence
|
|
|
|
topics map[uint64]*topic // topics by id
|
2014-10-03 03:13:42 +00:00
|
|
|
}
|
|
|
|
|
2014-10-21 02:42:03 +00:00
|
|
|
// NewBroker returns a new instance of a Broker with default values.
|
|
|
|
func NewBroker() *Broker {
|
2014-10-03 03:13:42 +00:00
|
|
|
b := &Broker{
|
2014-10-24 04:22:52 +00:00
|
|
|
log: raft.NewLog(),
|
|
|
|
replicas: make(map[string]*Replica),
|
2014-11-10 02:55:53 +00:00
|
|
|
topics: make(map[uint64]*topic),
|
2014-10-03 03:13:42 +00:00
|
|
|
}
|
|
|
|
b.log.FSM = (*brokerFSM)(b)
|
|
|
|
return b
|
|
|
|
}
|
|
|
|
|
|
|
|
// Path returns the path used when opening the broker.
|
|
|
|
// Returns empty string if the broker is not open.
|
|
|
|
func (b *Broker) Path() string { return b.path }
|
|
|
|
|
|
|
|
func (b *Broker) opened() bool { return b.path != "" }
|
|
|
|
|
|
|
|
// Open initializes the log.
|
|
|
|
// The broker then must be initialized or join a cluster before it can be used.
|
2014-12-19 21:00:57 +00:00
|
|
|
func (b *Broker) Open(path string, addr string) error {
|
2014-10-03 03:13:42 +00:00
|
|
|
b.mu.Lock()
|
|
|
|
defer b.mu.Unlock()
|
|
|
|
|
|
|
|
// Require a non-blank path.
|
|
|
|
if path == "" {
|
2014-10-12 18:05:03 +00:00
|
|
|
return ErrPathRequired
|
2014-10-03 03:13:42 +00:00
|
|
|
}
|
|
|
|
b.path = path
|
|
|
|
|
2014-12-19 22:07:10 +00:00
|
|
|
// Require a non-blank connection address.
|
|
|
|
if addr == "" {
|
|
|
|
return ErrConnectionAddressRequired
|
|
|
|
}
|
|
|
|
|
2014-12-19 16:51:24 +00:00
|
|
|
// Open underlying raft log and set its connection URL.
|
2014-10-03 03:13:42 +00:00
|
|
|
if err := b.log.Open(filepath.Join(path, "raft")); err != nil {
|
|
|
|
return fmt.Errorf("raft: %s", err)
|
|
|
|
}
|
2014-12-19 21:00:57 +00:00
|
|
|
u, err := url.Parse(addr)
|
2014-12-19 16:51:24 +00:00
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("broker: %s", err)
|
|
|
|
}
|
|
|
|
b.log.URL = u
|
2014-10-03 03:13:42 +00:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close closes the broker and all topics.
|
|
|
|
func (b *Broker) Close() error {
|
|
|
|
b.mu.Lock()
|
|
|
|
defer b.mu.Unlock()
|
|
|
|
|
|
|
|
// Return error if the broker is already closed.
|
|
|
|
if !b.opened() {
|
2014-10-12 18:05:03 +00:00
|
|
|
return ErrClosed
|
2014-10-03 03:13:42 +00:00
|
|
|
}
|
|
|
|
b.path = ""
|
|
|
|
|
2014-10-12 18:05:03 +00:00
|
|
|
// TODO: Close all topics.
|
2014-10-03 03:13:42 +00:00
|
|
|
|
2014-10-17 04:11:28 +00:00
|
|
|
// Close all replicas.
|
|
|
|
for _, r := range b.replicas {
|
|
|
|
r.closeWriter()
|
|
|
|
}
|
|
|
|
|
2014-10-03 03:13:42 +00:00
|
|
|
// Close raft log.
|
|
|
|
_ = b.log.Close()
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Initialize creates a new cluster.
|
|
|
|
func (b *Broker) Initialize() error {
|
|
|
|
if err := b.log.Initialize(); err != nil {
|
|
|
|
return fmt.Errorf("raft: %s", err)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2014-12-18 22:07:48 +00:00
|
|
|
// Join joins an existing cluster.
|
|
|
|
func (b *Broker) Join(u *url.URL) error {
|
|
|
|
if err := b.log.Join(u); err != nil {
|
|
|
|
return fmt.Errorf("raft: %s", err)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2014-10-24 04:22:52 +00:00
|
|
|
// Publish writes a message.
|
2014-10-03 03:13:42 +00:00
|
|
|
// Returns the index of the message. Otherwise returns an error.
|
2014-10-24 04:22:52 +00:00
|
|
|
func (b *Broker) Publish(m *Message) (uint64, error) {
|
2014-10-17 15:53:10 +00:00
|
|
|
buf, _ := m.MarshalBinary()
|
|
|
|
return b.log.Apply(buf)
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
|
|
|
|
2014-10-24 04:22:52 +00:00
|
|
|
// PublishSync writes a message and waits until the change is applied.
|
|
|
|
func (b *Broker) PublishSync(m *Message) error {
|
|
|
|
// Publish message.
|
|
|
|
index, err := b.Publish(m)
|
2014-10-12 18:05:03 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2014-10-24 04:22:52 +00:00
|
|
|
// Wait for message to apply.
|
|
|
|
if err := b.Sync(index); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2014-10-03 03:13:42 +00:00
|
|
|
}
|
|
|
|
|
2014-10-24 04:22:52 +00:00
|
|
|
// Sync pauses until the given index has been applied.
|
|
|
|
func (b *Broker) Sync(index uint64) error {
|
2014-10-03 03:13:42 +00:00
|
|
|
return b.log.Wait(index)
|
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// Replica returns a replica by name.
|
|
|
|
func (b *Broker) Replica(name string) *Replica {
|
2014-10-12 18:05:03 +00:00
|
|
|
b.mu.RLock()
|
|
|
|
defer b.mu.RUnlock()
|
2014-10-13 23:32:12 +00:00
|
|
|
return b.replicas[name]
|
2014-10-04 17:27:12 +00:00
|
|
|
}
|
|
|
|
|
2014-10-12 18:05:03 +00:00
|
|
|
// initializes a new topic object.
|
2014-11-10 02:55:53 +00:00
|
|
|
func (b *Broker) createTopic(id uint64) *topic {
|
2014-10-12 18:05:03 +00:00
|
|
|
t := &topic{
|
2014-10-13 23:32:12 +00:00
|
|
|
id: id,
|
2014-10-24 04:22:52 +00:00
|
|
|
path: filepath.Join(b.path, strconv.FormatUint(uint64(id), 10)),
|
2014-10-13 23:32:12 +00:00
|
|
|
replicas: make(map[string]*Replica),
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
|
|
|
b.topics[t.id] = t
|
2014-10-24 04:22:52 +00:00
|
|
|
return t
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
|
|
|
|
2014-11-10 02:55:53 +00:00
|
|
|
func (b *Broker) createTopicIfNotExists(id uint64) *topic {
|
2014-10-24 04:22:52 +00:00
|
|
|
if t := b.topics[id]; t != nil {
|
|
|
|
return t
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
2014-10-24 04:22:52 +00:00
|
|
|
return b.createTopic(id)
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// CreateReplica creates a new named replica.
|
|
|
|
func (b *Broker) CreateReplica(name string) error {
|
2014-10-12 18:05:03 +00:00
|
|
|
b.mu.Lock()
|
|
|
|
defer b.mu.Unlock()
|
2014-10-04 17:27:12 +00:00
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// Ensure replica doesn't already exist.
|
|
|
|
s := b.replicas[name]
|
2014-10-12 18:05:03 +00:00
|
|
|
if s != nil {
|
2014-10-15 03:42:40 +00:00
|
|
|
return ErrReplicaExists
|
2014-10-04 17:27:12 +00:00
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// Add command to create replica.
|
2014-10-24 04:22:52 +00:00
|
|
|
return b.PublishSync(&Message{
|
2014-10-13 23:32:12 +00:00
|
|
|
Type: CreateReplicaMessageType,
|
2014-11-05 05:32:17 +00:00
|
|
|
Data: mustMarshalJSON(&CreateReplicaCommand{Name: name}),
|
2014-10-12 18:05:03 +00:00
|
|
|
})
|
2014-10-04 17:27:12 +00:00
|
|
|
}
|
|
|
|
|
2014-10-24 04:22:52 +00:00
|
|
|
func (b *Broker) applyCreateReplica(m *Message) {
|
|
|
|
var c CreateReplicaCommand
|
2014-11-05 05:32:17 +00:00
|
|
|
mustUnmarshalJSON(m.Data, &c)
|
2014-10-24 04:22:52 +00:00
|
|
|
|
|
|
|
// Create replica.
|
|
|
|
r := newReplica(b, c.Name)
|
2014-10-12 18:05:03 +00:00
|
|
|
|
|
|
|
// Automatically subscribe to the config topic.
|
2014-10-24 04:22:52 +00:00
|
|
|
t := b.createTopicIfNotExists(BroadcastTopicID)
|
|
|
|
r.topics[BroadcastTopicID] = t.index
|
2014-10-12 18:05:03 +00:00
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// Add replica to the broker.
|
2014-10-24 04:22:52 +00:00
|
|
|
b.replicas[c.Name] = r
|
2014-10-04 17:27:12 +00:00
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// DeleteReplica deletes an existing replica by name.
|
|
|
|
func (b *Broker) DeleteReplica(name string) error {
|
2014-10-12 18:05:03 +00:00
|
|
|
b.mu.Lock()
|
|
|
|
defer b.mu.Unlock()
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// Ensure replica exists.
|
|
|
|
if s := b.replicas[name]; s == nil {
|
2014-10-15 03:42:40 +00:00
|
|
|
return ErrReplicaNotFound
|
2014-10-03 03:13:42 +00:00
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// Issue command to remove replica.
|
2014-10-24 04:22:52 +00:00
|
|
|
return b.PublishSync(&Message{
|
2014-10-13 23:32:12 +00:00
|
|
|
Type: DeleteReplicaMessageType,
|
2014-11-05 05:32:17 +00:00
|
|
|
Data: mustMarshalJSON(&DeleteReplicaCommand{Name: name}),
|
2014-10-12 18:05:03 +00:00
|
|
|
})
|
|
|
|
}
|
2014-10-03 03:13:42 +00:00
|
|
|
|
2014-10-24 04:22:52 +00:00
|
|
|
func (b *Broker) applyDeleteReplica(m *Message) {
|
|
|
|
var c DeleteReplicaCommand
|
2014-11-05 05:32:17 +00:00
|
|
|
mustUnmarshalJSON(m.Data, &c)
|
2014-10-24 04:22:52 +00:00
|
|
|
|
2014-10-15 03:42:40 +00:00
|
|
|
// Find replica.
|
2014-10-24 04:22:52 +00:00
|
|
|
r := b.replicas[c.Name]
|
|
|
|
if r == nil {
|
|
|
|
return
|
|
|
|
}
|
2014-10-15 03:42:40 +00:00
|
|
|
|
|
|
|
// Remove replica from all subscribed topics.
|
2014-10-24 04:22:52 +00:00
|
|
|
for topicID := range r.topics {
|
|
|
|
if t := b.topics[topicID]; t != nil {
|
|
|
|
delete(t.replicas, r.name)
|
2014-10-15 03:42:40 +00:00
|
|
|
}
|
|
|
|
}
|
2014-11-10 02:55:53 +00:00
|
|
|
r.topics = make(map[uint64]uint64)
|
2014-10-15 03:42:40 +00:00
|
|
|
|
|
|
|
// Close replica's writer.
|
|
|
|
r.closeWriter()
|
|
|
|
|
|
|
|
// Remove replica from broker.
|
2014-10-24 04:22:52 +00:00
|
|
|
delete(b.replicas, c.Name)
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
2014-10-03 03:13:42 +00:00
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// Subscribe adds a subscription to a topic from a replica.
|
2014-11-10 02:55:53 +00:00
|
|
|
func (b *Broker) Subscribe(replica string, topicID uint64) error {
|
2014-10-12 18:05:03 +00:00
|
|
|
b.mu.Lock()
|
|
|
|
defer b.mu.Unlock()
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// Ensure replica & topic exist.
|
|
|
|
if b.replicas[replica] == nil {
|
2014-10-15 03:42:40 +00:00
|
|
|
return ErrReplicaNotFound
|
2014-10-03 03:13:42 +00:00
|
|
|
}
|
|
|
|
|
2014-10-12 18:05:03 +00:00
|
|
|
// Issue command to subscribe to topic.
|
2014-10-24 04:22:52 +00:00
|
|
|
return b.PublishSync(&Message{
|
2014-10-12 18:05:03 +00:00
|
|
|
Type: SubscribeMessageType,
|
2014-11-05 05:32:17 +00:00
|
|
|
Data: mustMarshalJSON(&SubscribeCommand{Replica: replica, TopicID: topicID}),
|
2014-10-12 18:05:03 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// applySubscribe is called when the SubscribeCommand is applied.
|
2014-10-24 04:22:52 +00:00
|
|
|
func (b *Broker) applySubscribe(m *Message) {
|
|
|
|
var c SubscribeCommand
|
2014-11-05 05:32:17 +00:00
|
|
|
mustUnmarshalJSON(m.Data, &c)
|
2014-10-12 18:05:03 +00:00
|
|
|
|
2014-10-24 04:22:52 +00:00
|
|
|
// Retrieve replica.
|
|
|
|
r := b.replicas[c.Replica]
|
|
|
|
if r == nil {
|
|
|
|
return
|
|
|
|
}
|
2014-10-12 18:05:03 +00:00
|
|
|
|
|
|
|
// Save current index on topic.
|
2014-10-24 04:22:52 +00:00
|
|
|
t := b.createTopicIfNotExists(c.TopicID)
|
2014-10-12 18:05:03 +00:00
|
|
|
index := t.index
|
|
|
|
|
|
|
|
// Ensure topic is not already subscribed to.
|
2014-10-24 04:22:52 +00:00
|
|
|
if _, ok := r.topics[c.TopicID]; ok {
|
2014-10-12 18:05:03 +00:00
|
|
|
warn("already subscribed to topic")
|
2014-10-24 04:22:52 +00:00
|
|
|
return
|
2014-10-03 03:13:42 +00:00
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// Add subscription to replica.
|
2014-10-24 04:22:52 +00:00
|
|
|
r.topics[c.TopicID] = index
|
|
|
|
t.replicas[c.Replica] = r
|
2014-10-12 18:05:03 +00:00
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// Catch up replica.
|
2014-11-13 05:32:42 +00:00
|
|
|
_, _ = t.writeTo(r, index)
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// Unsubscribe removes a subscription for a topic from a replica.
|
2014-11-10 02:55:53 +00:00
|
|
|
func (b *Broker) Unsubscribe(replica string, topicID uint64) error {
|
2014-10-12 18:05:03 +00:00
|
|
|
b.mu.Lock()
|
|
|
|
defer b.mu.Unlock()
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// Ensure replica & topic exist.
|
|
|
|
if b.replicas[replica] == nil {
|
2014-10-15 03:42:40 +00:00
|
|
|
return ErrReplicaNotFound
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Issue command to unsubscribe from topic.
|
2014-10-24 04:22:52 +00:00
|
|
|
return b.PublishSync(&Message{
|
2014-10-12 18:05:03 +00:00
|
|
|
Type: UnsubscribeMessageType,
|
2014-11-05 05:32:17 +00:00
|
|
|
Data: mustMarshalJSON(&UnsubscribeCommand{Replica: replica, TopicID: topicID}),
|
2014-10-12 18:05:03 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2014-10-24 04:22:52 +00:00
|
|
|
func (b *Broker) applyUnsubscribe(m *Message) {
|
|
|
|
var c UnsubscribeCommand
|
2014-11-05 05:32:17 +00:00
|
|
|
mustUnmarshalJSON(m.Data, &c)
|
2014-10-12 18:05:03 +00:00
|
|
|
|
2014-10-24 04:22:52 +00:00
|
|
|
// Remove topic from replica.
|
|
|
|
if r := b.replicas[c.Replica]; r != nil {
|
|
|
|
delete(r.topics, c.TopicID)
|
|
|
|
}
|
2014-10-12 18:05:03 +00:00
|
|
|
|
2014-10-24 04:22:52 +00:00
|
|
|
// Remove replica from topic.
|
|
|
|
if t := b.topics[c.TopicID]; t != nil {
|
|
|
|
delete(t.replicas, c.Replica)
|
|
|
|
}
|
2014-10-03 03:13:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// brokerFSM implements the raft.FSM interface for the broker.
|
|
|
|
// This is implemented as a separate type because it is not meant to be exported.
|
|
|
|
type brokerFSM Broker
|
|
|
|
|
|
|
|
// Apply executes a raft log entry against the broker.
|
|
|
|
func (fsm *brokerFSM) Apply(e *raft.LogEntry) error {
|
|
|
|
b := (*Broker)(fsm)
|
|
|
|
|
|
|
|
// Ignore internal raft entries.
|
|
|
|
if e.Type != raft.LogEntryCommand {
|
|
|
|
// TODO: Save index.
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2014-10-17 15:53:10 +00:00
|
|
|
// Decode the message from the raft log.
|
|
|
|
m := &Message{}
|
|
|
|
err := m.UnmarshalBinary(e.Data)
|
|
|
|
assert(err == nil, "message unmarshal: %s", err)
|
|
|
|
|
|
|
|
// Add the raft index to the message.
|
2014-10-03 03:13:42 +00:00
|
|
|
m.Index = e.Index
|
2014-10-12 18:05:03 +00:00
|
|
|
|
2014-10-04 17:27:12 +00:00
|
|
|
// Update the broker configuration.
|
|
|
|
switch m.Type {
|
2014-10-13 23:32:12 +00:00
|
|
|
case CreateReplicaMessageType:
|
2014-10-24 04:22:52 +00:00
|
|
|
b.applyCreateReplica(m)
|
2014-10-13 23:32:12 +00:00
|
|
|
case DeleteReplicaMessageType:
|
2014-10-24 04:22:52 +00:00
|
|
|
b.applyDeleteReplica(m)
|
2014-10-12 18:05:03 +00:00
|
|
|
case SubscribeMessageType:
|
2014-10-24 04:22:52 +00:00
|
|
|
b.applySubscribe(m)
|
2014-10-12 18:05:03 +00:00
|
|
|
case UnsubscribeMessageType:
|
2014-10-24 04:22:52 +00:00
|
|
|
b.applyUnsubscribe(m)
|
2014-10-03 03:13:42 +00:00
|
|
|
}
|
|
|
|
|
2014-10-12 18:05:03 +00:00
|
|
|
// Write to the topic.
|
2014-10-24 04:22:52 +00:00
|
|
|
t := b.createTopicIfNotExists(m.TopicID)
|
2014-10-12 18:05:03 +00:00
|
|
|
if err := t.encode(m); err != nil {
|
|
|
|
return fmt.Errorf("encode: %s", err)
|
2014-10-04 17:27:12 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2014-10-03 03:13:42 +00:00
|
|
|
// Index returns the highest index that the broker has seen.
|
|
|
|
func (fsm *brokerFSM) Index() (uint64, error) {
|
|
|
|
// TODO: Retrieve index.
|
|
|
|
return 0, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Snapshot streams the current state of the broker and returns the index.
|
|
|
|
func (fsm *brokerFSM) Snapshot(w io.Writer) (uint64, error) {
|
|
|
|
// TODO: Prevent truncation during snapshot.
|
|
|
|
// TODO: Lock and calculate header.
|
|
|
|
// TODO: Retrieve snapshot index.
|
|
|
|
// TODO: Stream each topic.
|
|
|
|
return 0, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Restore reads the broker state.
|
|
|
|
func (fsm *brokerFSM) Restore(r io.Reader) error {
|
|
|
|
// TODO: Read header.
|
|
|
|
// TODO: Read in each file.
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// topic represents a single named queue of messages.
|
2014-10-03 03:13:42 +00:00
|
|
|
// Each topic is identified by a unique path.
|
|
|
|
type topic struct {
|
2014-11-10 02:55:53 +00:00
|
|
|
id uint64 // unique identifier
|
2014-10-12 18:05:03 +00:00
|
|
|
index uint64 // highest index written
|
|
|
|
path string // on-disk path
|
|
|
|
|
|
|
|
file *os.File // on-disk representation
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
replicas map[string]*Replica // replicas subscribed to topic
|
2014-10-03 03:13:42 +00:00
|
|
|
}
|
|
|
|
|
2014-10-12 18:05:03 +00:00
|
|
|
// open opens a topic for writing.
|
|
|
|
func (t *topic) open() error {
|
2014-10-24 04:22:52 +00:00
|
|
|
assert(t.file == nil, "topic already open: %d", t.id)
|
2014-10-12 18:05:03 +00:00
|
|
|
|
|
|
|
// Ensure the parent directory exists.
|
|
|
|
if err := os.MkdirAll(filepath.Dir(t.path), 0700); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Open the writer to the on-disk file.
|
|
|
|
f, err := os.OpenFile(t.path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
t.file = f
|
|
|
|
|
2014-10-03 03:13:42 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// close closes the underlying file.
|
2014-10-12 18:05:03 +00:00
|
|
|
func (t *topic) Close() error {
|
|
|
|
// Close file.
|
|
|
|
if t.file != nil {
|
|
|
|
_ = t.file.Close()
|
|
|
|
t.file = nil
|
|
|
|
}
|
|
|
|
return nil
|
2014-10-04 17:27:12 +00:00
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// writeTo writes the topic to a replica since a given index.
|
|
|
|
// Returns an error if the starting index is unavailable.
|
|
|
|
func (t *topic) writeTo(r *Replica, index uint64) (int, error) {
|
2014-10-12 18:05:03 +00:00
|
|
|
// TODO: If index is too old then return an error.
|
|
|
|
|
|
|
|
// Open topic file for reading.
|
2014-11-13 05:32:42 +00:00
|
|
|
// If it doesn't exist then just exit immediately.
|
2014-10-12 18:05:03 +00:00
|
|
|
f, err := os.Open(t.path)
|
2014-11-13 05:32:42 +00:00
|
|
|
if os.IsNotExist(err) {
|
|
|
|
return 0, nil
|
|
|
|
} else if err != nil {
|
2014-10-13 23:32:12 +00:00
|
|
|
return 0, err
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
|
|
|
defer func() { _ = f.Close() }()
|
|
|
|
|
|
|
|
// Stream out all messages until EOF.
|
2014-10-13 23:32:12 +00:00
|
|
|
total := 0
|
|
|
|
dec := NewMessageDecoder(bufio.NewReader(f))
|
2014-10-12 18:05:03 +00:00
|
|
|
for {
|
|
|
|
// Decode message.
|
|
|
|
var m Message
|
|
|
|
if err := dec.Decode(&m); err == io.EOF {
|
|
|
|
break
|
|
|
|
} else if err != nil {
|
2014-10-13 23:32:12 +00:00
|
|
|
return total, fmt.Errorf("decode: %s", err)
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Ignore message if it's on or before high water mark.
|
|
|
|
if m.Index <= index {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// Write message out to stream.
|
2014-10-13 23:32:12 +00:00
|
|
|
n, err := m.WriteTo(r)
|
|
|
|
if err != nil {
|
|
|
|
return total, fmt.Errorf("write to: %s", err)
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
2014-10-13 23:32:12 +00:00
|
|
|
total += n
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
return total, nil
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// encode writes a message to the end of the topic.
|
|
|
|
func (t *topic) encode(m *Message) error {
|
|
|
|
// Ensure the topic is open and ready for writing.
|
|
|
|
if t.file == nil {
|
|
|
|
if err := t.open(); err != nil {
|
|
|
|
return fmt.Errorf("open: %s", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Ensure message is in-order.
|
|
|
|
assert(m.Index > t.index, "topic message out of order: %d -> %d", t.index, m.Index)
|
|
|
|
|
|
|
|
// Encode message.
|
|
|
|
b := make([]byte, messageHeaderSize+len(m.Data))
|
2014-10-17 15:53:10 +00:00
|
|
|
copy(b, m.marshalHeader())
|
2014-10-12 18:05:03 +00:00
|
|
|
copy(b[messageHeaderSize:], m.Data)
|
|
|
|
|
|
|
|
// Write to topic file.
|
|
|
|
if _, err := t.file.Write(b); err != nil {
|
|
|
|
return fmt.Errorf("encode header: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Move up high water mark on the topic.
|
|
|
|
t.index = m.Index
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// Write message out to all replicas.
|
|
|
|
for _, r := range t.replicas {
|
|
|
|
_, _ = r.Write(b)
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
|
|
|
|
2014-10-04 17:27:12 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// Replica represents a collection of subscriptions to topics on the broker.
|
|
|
|
// The replica maintains the highest index read for each topic so that the
|
2014-10-12 18:05:03 +00:00
|
|
|
// broker can use this high water mark for trimming the topic logs.
|
2014-10-13 23:32:12 +00:00
|
|
|
type Replica struct {
|
2014-10-12 18:05:03 +00:00
|
|
|
name string
|
2014-10-13 23:32:12 +00:00
|
|
|
url *url.URL // TODO
|
2014-10-12 18:05:03 +00:00
|
|
|
broker *Broker
|
2014-10-13 23:32:12 +00:00
|
|
|
|
|
|
|
writer io.Writer // currently attached writer
|
|
|
|
done chan struct{} // notify when current writer is removed
|
2014-10-12 18:05:03 +00:00
|
|
|
|
2014-11-10 02:55:53 +00:00
|
|
|
topics map[uint64]uint64 // current index for each subscribed topic
|
2014-10-24 04:22:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// newReplica returns a new named Replica instance associated with a broker.
|
|
|
|
func newReplica(b *Broker, name string) *Replica {
|
|
|
|
return &Replica{
|
|
|
|
broker: b,
|
|
|
|
name: name,
|
2014-11-10 02:55:53 +00:00
|
|
|
topics: make(map[uint64]uint64),
|
2014-10-24 04:22:52 +00:00
|
|
|
}
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// closeWriter removes the writer on the replica and closes the notify channel.
|
|
|
|
func (r *Replica) closeWriter() {
|
|
|
|
if r.writer != nil {
|
|
|
|
r.writer = nil
|
|
|
|
close(r.done)
|
|
|
|
r.done = nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-10-15 03:42:40 +00:00
|
|
|
// Topics returns a list of topic names that the replica is subscribed to.
|
2014-11-10 02:55:53 +00:00
|
|
|
func (r *Replica) Topics() []uint64 {
|
|
|
|
a := make([]uint64, 0, len(r.topics))
|
2014-10-24 04:22:52 +00:00
|
|
|
for topicID := range r.topics {
|
|
|
|
a = append(a, topicID)
|
2014-10-15 03:42:40 +00:00
|
|
|
}
|
2014-11-10 02:55:53 +00:00
|
|
|
sort.Sort(uint64Slice(a))
|
2014-10-15 03:42:40 +00:00
|
|
|
return a
|
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// Write writes a byte slice to the underlying writer.
|
|
|
|
// If no writer is available then ErrReplicaUnavailable is returned.
|
|
|
|
func (r *Replica) Write(p []byte) (int, error) {
|
|
|
|
// Check if there's a replica available.
|
|
|
|
if r.writer == nil {
|
|
|
|
return 0, errReplicaUnavailable
|
|
|
|
}
|
2014-10-12 18:05:03 +00:00
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// If an error occurs on the write then remove the writer.
|
|
|
|
n, err := r.writer.Write(p)
|
|
|
|
if err != nil {
|
|
|
|
r.closeWriter()
|
|
|
|
return n, errReplicaUnavailable
|
2014-10-04 17:27:12 +00:00
|
|
|
}
|
|
|
|
|
2014-10-17 04:11:28 +00:00
|
|
|
// If the writer has a flush method then call it.
|
|
|
|
if w, ok := r.writer.(flusher); ok {
|
|
|
|
w.Flush()
|
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
return n, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// WriteTo begins writing messages to a named stream.
|
|
|
|
// Only one writer is allowed on a stream at a time.
|
|
|
|
func (r *Replica) WriteTo(w io.Writer) (int, error) {
|
|
|
|
// Close previous writer, if set.
|
|
|
|
r.closeWriter()
|
|
|
|
|
|
|
|
// Set a new writer on the replica.
|
|
|
|
r.writer = w
|
|
|
|
done := make(chan struct{})
|
|
|
|
r.done = done
|
2014-10-12 18:05:03 +00:00
|
|
|
|
|
|
|
// Create a topic list with the "config" topic first.
|
2014-10-13 23:32:12 +00:00
|
|
|
// Configuration changes need to be propagated to make sure topics exist.
|
2014-11-10 02:55:53 +00:00
|
|
|
ids := make([]uint64, 0, len(r.topics))
|
2014-10-24 04:22:52 +00:00
|
|
|
for topicID := range r.topics {
|
|
|
|
ids = append(ids, topicID)
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
2014-11-10 02:55:53 +00:00
|
|
|
sort.Sort(uint64Slice(ids))
|
2014-10-12 18:05:03 +00:00
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// Catch up and attach replica to all subscribed topics.
|
2014-10-24 04:22:52 +00:00
|
|
|
for _, topicID := range ids {
|
2014-10-13 23:32:12 +00:00
|
|
|
// Find topic.
|
2014-10-24 04:22:52 +00:00
|
|
|
t := r.broker.topics[topicID]
|
|
|
|
assert(t != nil, "topic missing: %s", topicID)
|
2014-10-13 23:32:12 +00:00
|
|
|
|
|
|
|
// Write topic messages from last known index.
|
|
|
|
// Replica machine can ignore messages it already seen.
|
2014-10-24 04:22:52 +00:00
|
|
|
index := r.topics[topicID]
|
2014-10-13 23:32:12 +00:00
|
|
|
if _, err := t.writeTo(r, index); err != nil {
|
2014-11-13 05:32:42 +00:00
|
|
|
r.closeWriter()
|
2014-10-12 18:05:03 +00:00
|
|
|
return 0, fmt.Errorf("add stream writer: %s", err)
|
|
|
|
}
|
2014-10-13 23:32:12 +00:00
|
|
|
|
|
|
|
// Attach replica to topic to tail new messages.
|
|
|
|
t.replicas[r.name] = r
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
2014-10-04 17:27:12 +00:00
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// Wait for writer to close and then return.
|
|
|
|
<-done
|
2014-10-04 17:27:12 +00:00
|
|
|
return 0, nil
|
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// CreateReplica creates a new named replica.
|
|
|
|
type CreateReplicaCommand struct {
|
2014-10-12 18:05:03 +00:00
|
|
|
Name string `json:"name"`
|
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// DeleteReplicaCommand removes a replica by name.
|
|
|
|
type DeleteReplicaCommand struct {
|
2014-10-12 18:05:03 +00:00
|
|
|
Name string `json:"name"`
|
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// SubscribeCommand subscribes a replica to a new topic.
|
2014-10-12 18:05:03 +00:00
|
|
|
type SubscribeCommand struct {
|
2014-10-13 23:32:12 +00:00
|
|
|
Replica string `json:"replica"` // replica name
|
2014-11-10 02:55:53 +00:00
|
|
|
TopicID uint64 `json:"topicID"` // topic id
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
|
|
|
|
2014-10-13 23:32:12 +00:00
|
|
|
// UnsubscribeCommand removes a subscription for a topic from a replica.
|
2014-10-12 18:05:03 +00:00
|
|
|
type UnsubscribeCommand struct {
|
2014-10-13 23:32:12 +00:00
|
|
|
Replica string `json:"replica"` // replica name
|
2014-11-10 02:55:53 +00:00
|
|
|
TopicID uint64 `json:"topicID"` // topic id
|
2014-10-12 18:05:03 +00:00
|
|
|
}
|
|
|
|
|
2014-10-15 03:42:40 +00:00
|
|
|
// MessageType represents the type of message.
|
|
|
|
type MessageType uint16
|
|
|
|
|
|
|
|
const (
|
2014-10-24 04:22:52 +00:00
|
|
|
BrokerMessageType = 0x8000
|
2014-10-15 03:42:40 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2014-10-24 04:22:52 +00:00
|
|
|
CreateReplicaMessageType = BrokerMessageType | MessageType(0x00)
|
|
|
|
DeleteReplicaMessageType = BrokerMessageType | MessageType(0x01)
|
2014-10-15 03:42:40 +00:00
|
|
|
|
2014-10-24 04:22:52 +00:00
|
|
|
SubscribeMessageType = BrokerMessageType | MessageType(0x10)
|
|
|
|
UnsubscribeMessageType = BrokerMessageType | MessageType(0x11)
|
2014-10-15 03:42:40 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// The size of the encoded message header, in bytes.
|
2014-11-10 02:55:53 +00:00
|
|
|
const messageHeaderSize = 2 + 8 + 8 + 4
|
2014-10-15 03:42:40 +00:00
|
|
|
|
|
|
|
// Message represents a single item in a topic.
|
|
|
|
type Message struct {
|
2014-10-17 15:53:10 +00:00
|
|
|
Type MessageType
|
2014-11-10 02:55:53 +00:00
|
|
|
TopicID uint64
|
2014-10-17 15:53:10 +00:00
|
|
|
Index uint64
|
|
|
|
Data []byte
|
2014-10-15 03:42:40 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// WriteTo encodes and writes the message to a writer. Implements io.WriterTo.
|
|
|
|
func (m *Message) WriteTo(w io.Writer) (n int, err error) {
|
2014-10-17 15:53:10 +00:00
|
|
|
if n, err := w.Write(m.marshalHeader()); err != nil {
|
2014-10-15 03:42:40 +00:00
|
|
|
return n, err
|
|
|
|
}
|
|
|
|
if n, err := w.Write(m.Data); err != nil {
|
|
|
|
return messageHeaderSize + n, err
|
|
|
|
}
|
|
|
|
return messageHeaderSize + len(m.Data), nil
|
|
|
|
}
|
|
|
|
|
2014-10-17 15:53:10 +00:00
|
|
|
// MarshalBinary returns a binary representation of the message.
|
|
|
|
// This implements encoding.BinaryMarshaler. An error cannot be returned.
|
|
|
|
func (m *Message) MarshalBinary() ([]byte, error) {
|
|
|
|
b := make([]byte, messageHeaderSize+len(m.Data))
|
|
|
|
copy(b, m.marshalHeader())
|
|
|
|
copy(b[messageHeaderSize:], m.Data)
|
|
|
|
return b, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// UnmarshalBinary reads a message from a binary encoded slice.
|
|
|
|
// This implements encoding.BinaryUnmarshaler.
|
|
|
|
func (m *Message) UnmarshalBinary(b []byte) error {
|
|
|
|
m.unmarshalHeader(b)
|
|
|
|
if len(b[messageHeaderSize:]) < len(m.Data) {
|
|
|
|
return fmt.Errorf("message data too short: %d < %d", len(b[messageHeaderSize:]), len(m.Data))
|
|
|
|
}
|
|
|
|
copy(m.Data, b[messageHeaderSize:])
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// marshalHeader returns a byte slice with the message header.
|
|
|
|
func (m *Message) marshalHeader() []byte {
|
2014-10-15 03:42:40 +00:00
|
|
|
b := make([]byte, messageHeaderSize)
|
|
|
|
binary.BigEndian.PutUint16(b[0:2], uint16(m.Type))
|
2014-11-10 02:55:53 +00:00
|
|
|
binary.BigEndian.PutUint64(b[2:10], m.TopicID)
|
|
|
|
binary.BigEndian.PutUint64(b[10:18], m.Index)
|
|
|
|
binary.BigEndian.PutUint32(b[18:22], uint32(len(m.Data)))
|
2014-10-15 03:42:40 +00:00
|
|
|
return b
|
|
|
|
}
|
|
|
|
|
2014-10-17 15:53:10 +00:00
|
|
|
// unmarshalHeader reads message header data from binary encoded slice.
|
|
|
|
// The data field is appropriately sized but is not filled.
|
|
|
|
func (m *Message) unmarshalHeader(b []byte) {
|
|
|
|
m.Type = MessageType(binary.BigEndian.Uint16(b[0:2]))
|
2014-11-10 02:55:53 +00:00
|
|
|
m.TopicID = binary.BigEndian.Uint64(b[2:10])
|
|
|
|
m.Index = binary.BigEndian.Uint64(b[10:18])
|
|
|
|
m.Data = make([]byte, binary.BigEndian.Uint32(b[18:22]))
|
2014-10-17 15:53:10 +00:00
|
|
|
}
|
|
|
|
|
2014-10-15 03:42:40 +00:00
|
|
|
// MessageDecoder decodes messages from a reader.
|
|
|
|
type MessageDecoder struct {
|
|
|
|
r io.Reader
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewMessageDecoder returns a new instance of the MessageDecoder.
|
|
|
|
func NewMessageDecoder(r io.Reader) *MessageDecoder {
|
|
|
|
return &MessageDecoder{r: r}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Decode reads a message from the decoder's reader.
|
|
|
|
func (dec *MessageDecoder) Decode(m *Message) error {
|
2014-10-17 15:53:10 +00:00
|
|
|
// Read header bytes.
|
2014-10-15 03:42:40 +00:00
|
|
|
var b [messageHeaderSize]byte
|
|
|
|
if _, err := io.ReadFull(dec.r, b[:]); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2014-10-17 15:53:10 +00:00
|
|
|
m.unmarshalHeader(b[:])
|
2014-10-15 03:42:40 +00:00
|
|
|
|
|
|
|
// Read data.
|
2014-10-17 15:53:10 +00:00
|
|
|
if _, err := io.ReadFull(dec.r, m.Data); err != nil {
|
2014-10-15 03:42:40 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2014-10-17 04:11:28 +00:00
|
|
|
type flusher interface {
|
|
|
|
Flush()
|
|
|
|
}
|
|
|
|
|
2014-11-10 02:55:53 +00:00
|
|
|
// uint64Slice attaches the methods of Interface to []int, sorting in increasing order.
|
|
|
|
type uint64Slice []uint64
|
2014-10-24 04:22:52 +00:00
|
|
|
|
2014-11-10 02:55:53 +00:00
|
|
|
func (p uint64Slice) Len() int { return len(p) }
|
|
|
|
func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
|
|
|
|
func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
2014-10-24 04:22:52 +00:00
|
|
|
|
2014-11-05 05:32:17 +00:00
|
|
|
// mustMarshalJSON encodes a value to JSON.
|
2014-10-24 04:22:52 +00:00
|
|
|
// This will panic if an error occurs. This should only be used internally when
|
|
|
|
// an invalid marshal will cause corruption and a panic is appropriate.
|
2014-11-05 05:32:17 +00:00
|
|
|
func mustMarshalJSON(v interface{}) []byte {
|
2014-10-12 18:05:03 +00:00
|
|
|
b, err := json.Marshal(v)
|
2014-10-24 04:22:52 +00:00
|
|
|
if err != nil {
|
|
|
|
panic("marshal: " + err.Error())
|
|
|
|
}
|
2014-10-12 18:05:03 +00:00
|
|
|
return b
|
2014-10-04 17:27:12 +00:00
|
|
|
}
|
|
|
|
|
2014-11-05 05:32:17 +00:00
|
|
|
// mustUnmarshalJSON decodes a value from JSON.
|
2014-10-24 04:22:52 +00:00
|
|
|
// This will panic if an error occurs. This should only be used internally when
|
|
|
|
// an invalid unmarshal will cause corruption and a panic is appropriate.
|
2014-11-05 05:32:17 +00:00
|
|
|
func mustUnmarshalJSON(b []byte, v interface{}) {
|
2014-10-24 04:22:52 +00:00
|
|
|
if err := json.Unmarshal(b, v); err != nil {
|
|
|
|
panic("unmarshal: " + err.Error())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-10-03 03:13:42 +00:00
|
|
|
// assert will panic with a given formatted message if the given condition is false.
|
|
|
|
func assert(condition bool, msg string, v ...interface{}) {
|
|
|
|
if !condition {
|
|
|
|
panic(fmt.Sprintf("assert failed: "+msg, v...))
|
|
|
|
}
|
|
|
|
}
|
2014-10-12 18:05:03 +00:00
|
|
|
|
|
|
|
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
|
|
|
|
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }
|