Basic broker workflow.
parent
37b11da4e8
commit
accdfa33e4
|
@ -0,0 +1,18 @@
|
|||
Broker
|
||||
======
|
||||
|
||||
## Uncompleted
|
||||
|
||||
- [ ] Broker FSM snapshotting
|
||||
- [ ] Topic truncation
|
||||
|
||||
|
||||
## Completed
|
||||
|
||||
- [x] Message encoding
|
||||
- [x] Wrap broker over raft log
|
||||
- [x] Topic file writer
|
||||
- [x] Topic stream writers
|
||||
- [x] Broker publishing
|
||||
- [x] Config topic
|
||||
- [x] Stream topic from index
|
668
broker/broker.go
668
broker/broker.go
|
@ -3,32 +3,41 @@ package broker
|
|||
import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdb/influxdb/raft"
|
||||
)
|
||||
|
||||
const configTopicID = uint32(0)
|
||||
const configTopicName = "config"
|
||||
|
||||
// Broker represents distributed messaging system segmented into topics.
|
||||
// Each topic represents a linear series of events.
|
||||
type Broker struct {
|
||||
mu sync.Mutex
|
||||
path string // data directory
|
||||
mu sync.RWMutex
|
||||
path string // data directory
|
||||
log *raft.Log // internal raft log
|
||||
|
||||
log *raft.Log // internal raft log
|
||||
topics map[string]*topic // topic writers by path
|
||||
streams map[string]*Stream // stream by name
|
||||
|
||||
maxTopicID uint32 // autoincrementing sequence
|
||||
topics map[uint32]*topic // topics by id
|
||||
topicsByName map[string]*topic // topics by name
|
||||
}
|
||||
|
||||
// New returns a new instance of a Broker with default values.
|
||||
func New() *Broker {
|
||||
b := &Broker{
|
||||
log: raft.NewLog(),
|
||||
topics: make(map[string]*topic),
|
||||
streams: make(map[string]*Stream),
|
||||
log: raft.NewLog(),
|
||||
streams: make(map[string]*Stream),
|
||||
topics: make(map[uint32]*topic),
|
||||
topicsByName: make(map[string]*topic),
|
||||
}
|
||||
b.log.FSM = (*brokerFSM)(b)
|
||||
return b
|
||||
|
@ -48,11 +57,12 @@ func (b *Broker) Open(path string) error {
|
|||
|
||||
// Require a non-blank path.
|
||||
if path == "" {
|
||||
return fmt.Errorf("path required")
|
||||
return ErrPathRequired
|
||||
}
|
||||
b.path = path
|
||||
|
||||
// TODO(wal): Initialize all topics.
|
||||
// Initialize config topic.
|
||||
b.initTopic(configTopicID, configTopicName)
|
||||
|
||||
// Open underlying raft log.
|
||||
if err := b.log.Open(filepath.Join(path, "raft")); err != nil {
|
||||
|
@ -69,11 +79,11 @@ func (b *Broker) Close() error {
|
|||
|
||||
// Return error if the broker is already closed.
|
||||
if !b.opened() {
|
||||
return fmt.Errorf("broker closed")
|
||||
return ErrClosed
|
||||
}
|
||||
b.path = ""
|
||||
|
||||
// TODO(wal): Close all topics.
|
||||
// TODO: Close all topics.
|
||||
|
||||
// Close raft log.
|
||||
_ = b.log.Close()
|
||||
|
@ -92,17 +102,33 @@ func (b *Broker) Initialize() error {
|
|||
// Publish writes a message to a topic.
|
||||
// Returns the index of the message. Otherwise returns an error.
|
||||
func (b *Broker) Publish(topic string, m *Message) (uint64, error) {
|
||||
assert(len(topic) < 256, "topic too long: %s", topic)
|
||||
// Retrieve topic by name.
|
||||
b.mu.RLock()
|
||||
t := b.topicsByName[topic]
|
||||
b.mu.RUnlock()
|
||||
|
||||
// Encode type, topic, and data together.
|
||||
buf := make([]byte, 2+1+len(topic)+len(m.Data))
|
||||
binary.BigEndian.PutUint16(buf, uint16(m.Type))
|
||||
buf[2] = byte(len(topic))
|
||||
copy(buf[3:], []byte(topic))
|
||||
copy(buf[3+len(topic):], m.Data)
|
||||
// Ensure topic exists.
|
||||
if t == nil {
|
||||
return 0, errors.New("topic not found")
|
||||
}
|
||||
|
||||
return b.log.Apply(encodeTopicMessage(t.id, m))
|
||||
}
|
||||
|
||||
// publishConfig writes a configuration change to the config topic.
|
||||
// This always waits until the change is applied to the log.
|
||||
func (b *Broker) publishConfig(m *Message) error {
|
||||
// Encode topic, type, and data together.
|
||||
buf := encodeTopicMessage(configTopicID, m)
|
||||
|
||||
// Apply to the raft log.
|
||||
return b.log.Apply(buf)
|
||||
index, err := b.log.Apply(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Wait for index.
|
||||
return b.log.Wait(index)
|
||||
}
|
||||
|
||||
// Wait pauses until the given index has been applied.
|
||||
|
@ -112,62 +138,228 @@ func (b *Broker) Wait(index uint64) error {
|
|||
|
||||
// Stream returns a stream by name.
|
||||
func (b *Broker) Stream(name string) *Stream {
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
return b.streams[name]
|
||||
}
|
||||
|
||||
// CreateTopic creates a new topic.
|
||||
func (b *Broker) CreateTopic(name string) error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
// Ensure topic doesn't already exist.
|
||||
t := b.topicsByName[name]
|
||||
if t != nil {
|
||||
return errors.New("topic already exists")
|
||||
}
|
||||
|
||||
// Generate the next id.
|
||||
id := b.maxTopicID + 1
|
||||
|
||||
// Add command to create the topic.
|
||||
return b.publishConfig(&Message{
|
||||
Type: CreateTopicMessageType,
|
||||
Data: jsonify(&CreateTopicCommand{ID: id, Name: name}),
|
||||
})
|
||||
}
|
||||
|
||||
// applyCreateTopic is called when the CreateTopicCommand is applied.
|
||||
func (b *Broker) applyCreateTopic(id uint32, name string) {
|
||||
assert(b.topics[id] == nil, "duplicate topic id exists: %d", id)
|
||||
assert(b.topicsByName[name] == nil, "duplicate topic name exists: %s", name)
|
||||
b.initTopic(id, name)
|
||||
}
|
||||
|
||||
// initializes a new topic object.
|
||||
func (b *Broker) initTopic(id uint32, name string) {
|
||||
t := &topic{
|
||||
id: id,
|
||||
name: name,
|
||||
path: filepath.Join(b.path, name),
|
||||
writers: make(map[string]*streamWriter),
|
||||
}
|
||||
b.topics[t.id] = t
|
||||
b.topicsByName[t.name] = t
|
||||
}
|
||||
|
||||
// DeleteTopic deletes an existing topic.
|
||||
func (b *Broker) DeleteTopic(name string) error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
// Find topic.
|
||||
if t := b.topicsByName[name]; t == nil {
|
||||
return errors.New("topic not found")
|
||||
}
|
||||
|
||||
// Add command to remove.
|
||||
return b.publishConfig(&Message{
|
||||
Type: DeleteTopicMessageType,
|
||||
Data: jsonify(&DeleteTopicCommand{Name: name}),
|
||||
})
|
||||
}
|
||||
|
||||
// applyDeleteTopic is called when the DeleteTopicCommand is applied.
|
||||
func (b *Broker) applyDeleteTopic(name string) {
|
||||
t := b.topicsByName[name]
|
||||
assert(t != nil, "topic missing: %s", name)
|
||||
|
||||
// Close topic.
|
||||
_ = t.Close()
|
||||
|
||||
// Remove subscriptions.
|
||||
for _, s := range b.streams {
|
||||
delete(s.topics, name)
|
||||
}
|
||||
|
||||
// Remove from lookups.
|
||||
delete(b.topics, t.id)
|
||||
delete(b.topicsByName, t.name)
|
||||
}
|
||||
|
||||
// CreateStream creates a new named stream.
|
||||
func (b *Broker) CreateStream(name string) error {
|
||||
// Create message.
|
||||
var m Message
|
||||
m.Type = createStreamMessageType
|
||||
m.Data, _ = json.Marshal(&createStream{Name: name})
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
// Write to the log.
|
||||
index, err := b.Publish("config", &m)
|
||||
if err != nil {
|
||||
return err
|
||||
// Ensure stream doesn't already exist.
|
||||
s := b.streams[name]
|
||||
if s != nil {
|
||||
return errors.New("stream already exists")
|
||||
}
|
||||
|
||||
// Wait until applied.
|
||||
return b.log.Wait(index)
|
||||
// Add command to create stream.
|
||||
return b.publishConfig(&Message{
|
||||
Type: CreateStreamMessageType,
|
||||
Data: jsonify(&CreateStreamCommand{Name: name}),
|
||||
})
|
||||
}
|
||||
|
||||
// RemoveStream deletes an existing stream by name.
|
||||
func (b *Broker) RemoveStream(name string) error {
|
||||
// TODO: Add DeleteStream command to the log.
|
||||
// TODO: Wait until applied.
|
||||
// applyCreateStream is called when the CreateStreamCommand is applied.
|
||||
func (b *Broker) applyCreateStream(name string) {
|
||||
s := &Stream{
|
||||
broker: b,
|
||||
name: name,
|
||||
topics: make(map[string]uint64),
|
||||
}
|
||||
|
||||
// Automatically subscribe to the config topic.
|
||||
t := b.topics[configTopicID]
|
||||
assert(t != nil, "config topic missing")
|
||||
s.topics[configTopicName] = t.index
|
||||
|
||||
// Add stream to the broker.
|
||||
b.streams[name] = s
|
||||
}
|
||||
|
||||
// DeleteStream deletes an existing stream by name.
|
||||
func (b *Broker) DeleteStream(name string) error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
// Ensure stream exists.
|
||||
if s := b.streams[name]; s == nil {
|
||||
return errors.New("stream not found")
|
||||
}
|
||||
|
||||
// Issue command to remove stream.
|
||||
return b.publishConfig(&Message{
|
||||
Type: DeleteStreamMessageType,
|
||||
Data: jsonify(&DeleteStreamCommand{Name: name}),
|
||||
})
|
||||
}
|
||||
|
||||
// applyDeleteStream is called when the DeleteStreamCommand is applied.
|
||||
func (b *Broker) applyDeleteStream(name string) {
|
||||
panic("not yet implemented")
|
||||
}
|
||||
|
||||
// Subscribe adds a subscription to a topic from a stream.
|
||||
func (b *Broker) Subscribe(stream string, topic string) error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
// Ensure stream & topic exist.
|
||||
if b.streams[stream] == nil {
|
||||
return errors.New("stream not found")
|
||||
} else if b.topicsByName[topic] == nil {
|
||||
return errors.New("topic not found")
|
||||
}
|
||||
|
||||
// Issue command to subscribe to topic.
|
||||
return b.publishConfig(&Message{
|
||||
Type: SubscribeMessageType,
|
||||
Data: jsonify(&SubscribeCommand{Stream: stream, Topic: topic}),
|
||||
})
|
||||
}
|
||||
|
||||
// applySubscribe is called when the SubscribeCommand is applied.
|
||||
func (b *Broker) applySubscribe(stream string, topic string) error {
|
||||
// Retrieve stream.
|
||||
s := b.streams[stream]
|
||||
assert(s != nil, "stream not found: %s", stream)
|
||||
|
||||
// Retrieve topic.
|
||||
t := b.topicsByName[topic]
|
||||
assert(t != nil, "topic not found: %s", topic)
|
||||
|
||||
// Save current index on topic.
|
||||
index := t.index
|
||||
|
||||
// Ensure topic is not already subscribed to.
|
||||
if _, ok := s.topics[t.name]; ok {
|
||||
warn("already subscribed to topic")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add subscription to stream.
|
||||
s.topics[t.name] = index
|
||||
|
||||
// Add tailing stream writer to topic.
|
||||
if s.writer != nil {
|
||||
if err := t.addWriter(s.name, s.writer, index); err != nil {
|
||||
return fmt.Errorf("add writer: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Returns the topic by name. Creates it if it doesn't exist.
|
||||
func (b *Broker) createTopicIfNotExists(name string) (*topic, error) {
|
||||
// Return it if it already exists.
|
||||
if t := b.topics[name]; t != nil {
|
||||
return t, nil
|
||||
// Unsubscribe removes a subscription for a topic from a stream.
|
||||
func (b *Broker) Unsubscribe(stream string, topic string) error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
// Ensure stream & topic exist.
|
||||
if b.streams[stream] == nil {
|
||||
return errors.New("stream not found")
|
||||
} else if b.topicsByName[topic] == nil {
|
||||
return errors.New("topic not found")
|
||||
}
|
||||
|
||||
// TODO: Disallow names starting with "raft" or containing "." or ".."
|
||||
// Issue command to unsubscribe from topic.
|
||||
return b.publishConfig(&Message{
|
||||
Type: UnsubscribeMessageType,
|
||||
Data: jsonify(&UnsubscribeCommand{Stream: stream, Topic: topic}),
|
||||
})
|
||||
}
|
||||
|
||||
// Otherwise create it.
|
||||
t := &topic{name: name}
|
||||
// applyUnsubscribe is called when the UnsubscribeCommand is applied.
|
||||
func (b *Broker) applyUnsubscribe(stream string, topic string) {
|
||||
// Retrieve stream.
|
||||
s := b.streams[stream]
|
||||
assert(s != nil, "stream not found: %s", stream)
|
||||
|
||||
// Ensure the parent directory exists.
|
||||
path := filepath.Join(b.path, name)
|
||||
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Retrieve topic.
|
||||
t := b.topicsByName[topic]
|
||||
assert(t != nil, "topic not found: %d", topic)
|
||||
|
||||
// Open the writer to the on-disk file.
|
||||
f, err := os.OpenFile(filepath.Join(b.path, name), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t.w = f
|
||||
// Remove subscription.
|
||||
delete(s.topics, topic)
|
||||
|
||||
// Cache the topic on the broker.
|
||||
b.topics[name] = t
|
||||
|
||||
return t, nil
|
||||
// Remove stream writer.
|
||||
t.removeWriter(stream)
|
||||
}
|
||||
|
||||
// brokerFSM implements the raft.FSM interface for the broker.
|
||||
|
@ -184,54 +376,64 @@ func (fsm *brokerFSM) Apply(e *raft.LogEntry) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Extract topic from Raft entry.
|
||||
// The third byte is the topic length.
|
||||
// The fourth byte is the start of the topic.
|
||||
sz := e.Data[2]
|
||||
topic := string(e.Data[3 : 3+sz])
|
||||
|
||||
// Create a message from the entry data.
|
||||
var m Message
|
||||
m.Type = MessageType(binary.BigEndian.Uint16(e.Data))
|
||||
// Decode the topic message from the raft log.
|
||||
topicID, m := decodeTopicMessage(e.Data)
|
||||
m.Index = e.Index
|
||||
m.Data = e.Data[3+sz:]
|
||||
|
||||
// Find topic by id. Ignore topic if it doesn't exist.
|
||||
t := b.topics[topicID]
|
||||
assert(t != nil, "topic not found: %d", topicID)
|
||||
|
||||
// Update the broker configuration.
|
||||
var err error
|
||||
switch m.Type {
|
||||
case createStreamMessageType:
|
||||
err = fsm.applyCreateStream(&m)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
case CreateTopicMessageType:
|
||||
var c CreateTopicCommand
|
||||
if err := json.Unmarshal(m.Data, &c); err != nil {
|
||||
return fmt.Errorf("unmarshal create topic: %s", err)
|
||||
}
|
||||
b.applyCreateTopic(c.ID, c.Name)
|
||||
|
||||
case DeleteTopicMessageType:
|
||||
var c DeleteTopicCommand
|
||||
if err := json.Unmarshal(m.Data, &c); err != nil {
|
||||
return fmt.Errorf("unmarshal delete topic: %s", err)
|
||||
}
|
||||
b.applyDeleteTopic(c.Name)
|
||||
|
||||
case CreateStreamMessageType:
|
||||
var c CreateStreamCommand
|
||||
if err := json.Unmarshal(m.Data, &c); err != nil {
|
||||
return fmt.Errorf("unmarshal create stream: %s", err)
|
||||
}
|
||||
b.applyCreateStream(c.Name)
|
||||
|
||||
case DeleteStreamMessageType:
|
||||
var c DeleteStreamCommand
|
||||
if err := json.Unmarshal(m.Data, &c); err != nil {
|
||||
return fmt.Errorf("unmarshal delete stream: %s", err)
|
||||
}
|
||||
b.applyDeleteStream(c.Name)
|
||||
|
||||
case SubscribeMessageType:
|
||||
var c SubscribeCommand
|
||||
if err := json.Unmarshal(m.Data, &c); err != nil {
|
||||
return fmt.Errorf("unmarshal subscribe: %s", err)
|
||||
}
|
||||
b.applySubscribe(c.Stream, c.Topic)
|
||||
|
||||
case UnsubscribeMessageType:
|
||||
var c UnsubscribeCommand
|
||||
if err := json.Unmarshal(m.Data, &c); err != nil {
|
||||
return fmt.Errorf("unmarshal unsubscribe: %s", err)
|
||||
}
|
||||
b.applyUnsubscribe(c.Stream, c.Topic)
|
||||
}
|
||||
|
||||
// Retrieve the topic.
|
||||
t, err := b.createTopicIfNotExists(topic)
|
||||
if err != nil {
|
||||
return err
|
||||
// Write to the topic.
|
||||
if err := t.encode(m); err != nil {
|
||||
return fmt.Errorf("encode: %s", err)
|
||||
}
|
||||
|
||||
// Write message to the topic.
|
||||
if err := t.writeMessage(&m); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// applyCreateStream processes a createStream message.
|
||||
func (fsm *brokerFSM) applyCreateStream(m *Message) error {
|
||||
var c createStream
|
||||
if err := json.Unmarshal(m.Data, &c); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create a new named stream.
|
||||
fsm.streams[c.Name] = &Stream{
|
||||
broker: (*Broker)(fsm),
|
||||
subscriptions: make(map[string]*subscription),
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -257,17 +459,169 @@ func (fsm *brokerFSM) Restore(r io.Reader) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// encodes a topic id and message together for the raft log.
|
||||
func encodeTopicMessage(topicID uint32, m *Message) []byte {
|
||||
b := make([]byte, 4+2+len(m.Data))
|
||||
binary.BigEndian.PutUint32(b, topicID)
|
||||
binary.BigEndian.PutUint16(b[4:6], uint16(m.Type))
|
||||
copy(b[6:], m.Data)
|
||||
return b
|
||||
}
|
||||
|
||||
// decodes a topic id and message together from the raft log.
|
||||
func decodeTopicMessage(b []byte) (topicID uint32, m *Message) {
|
||||
topicID = binary.BigEndian.Uint32(b[0:4])
|
||||
m = &Message{
|
||||
Type: MessageType(binary.BigEndian.Uint16(b[4:6])),
|
||||
Data: b[6:],
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// topic represents a single named stream of messages.
|
||||
// Each topic is identified by a unique path.
|
||||
type topic struct {
|
||||
name string // unique identifier (and on-disk path)
|
||||
w io.Writer // on-disk representation
|
||||
enc *MessageEncoder
|
||||
mu sync.RWMutex
|
||||
id uint32 // unique identifier
|
||||
name string // unique name
|
||||
index uint64 // highest index written
|
||||
path string // on-disk path
|
||||
|
||||
file *os.File // on-disk representation
|
||||
|
||||
writers map[string]*streamWriter // tailing stream writers by stream name
|
||||
}
|
||||
|
||||
// write writes a message to the end of the topic.
|
||||
func (t *topic) writeMessage(m *Message) error {
|
||||
// TODO
|
||||
// open opens a topic for writing.
|
||||
func (t *topic) open() error {
|
||||
if t.file != nil {
|
||||
return fmt.Errorf("topic already open")
|
||||
}
|
||||
|
||||
// Ensure the parent directory exists.
|
||||
if err := os.MkdirAll(filepath.Dir(t.path), 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Open the writer to the on-disk file.
|
||||
f, err := os.OpenFile(t.path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.file = f
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// close closes the underlying file and all stream writers.
|
||||
func (t *topic) Close() error {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
// Close file.
|
||||
if t.file != nil {
|
||||
_ = t.file.Close()
|
||||
t.file = nil
|
||||
}
|
||||
|
||||
// Close all stream writers.
|
||||
for _, w := range t.writers {
|
||||
_ = w.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// addWriter catches up a stream writer starting from the given index
|
||||
// and adds the writer to the list of caught up writers.
|
||||
func (t *topic) addWriter(name string, w *streamWriter, index uint64) error {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
// Remove previous stream writer.
|
||||
t.removeWriter(name)
|
||||
|
||||
// TODO: If index is too old then return an error.
|
||||
|
||||
// TODO: Only catch up if index is behind head.
|
||||
|
||||
// Open topic file for reading.
|
||||
f, err := os.Open(t.path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() { _ = f.Close() }()
|
||||
|
||||
// Stream out all messages until EOF.
|
||||
dec := NewMessageDecoder(f)
|
||||
for {
|
||||
// Decode message.
|
||||
var m Message
|
||||
if err := dec.Decode(&m); err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("decode: %s", err)
|
||||
}
|
||||
|
||||
// Ignore message if it's on or before high water mark.
|
||||
if m.Index <= index {
|
||||
continue
|
||||
}
|
||||
|
||||
// Write message out to stream.
|
||||
if _, err := m.WriteTo(w); err != nil {
|
||||
return fmt.Errorf("write to: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Add writer to tail the the topic.
|
||||
t.writers[name] = w
|
||||
return nil
|
||||
}
|
||||
|
||||
// removeWriter removes a steam writer from the tailing writers on the topic.
|
||||
func (t *topic) removeWriter(name string) {
|
||||
if w := t.writers[name]; w != nil {
|
||||
_ = w.Close()
|
||||
delete(t.writers, name)
|
||||
}
|
||||
}
|
||||
|
||||
// encode writes a message to the end of the topic.
|
||||
func (t *topic) encode(m *Message) error {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
// Ensure the topic is open and ready for writing.
|
||||
if t.file == nil {
|
||||
if err := t.open(); err != nil {
|
||||
return fmt.Errorf("open: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure message is in-order.
|
||||
assert(m.Index > t.index, "topic message out of order: %d -> %d", t.index, m.Index)
|
||||
|
||||
// Encode message.
|
||||
b := make([]byte, messageHeaderSize+len(m.Data))
|
||||
copy(b, m.header())
|
||||
copy(b[messageHeaderSize:], m.Data)
|
||||
|
||||
// Write to topic file.
|
||||
if _, err := t.file.Write(b); err != nil {
|
||||
return fmt.Errorf("encode header: %s", err)
|
||||
}
|
||||
|
||||
// Move up high water mark on the topic.
|
||||
t.index = m.Index
|
||||
|
||||
// Write message out to all attached stream writers.
|
||||
for name, w := range t.writers {
|
||||
if _, err := w.Write(b); err != nil {
|
||||
t.removeWriter(name)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -275,46 +629,68 @@ func (t *topic) writeMessage(m *Message) error {
|
|||
// The stream maintains the highest index read for each topic so that the
|
||||
// broker can use this high water mark for trimming the topic logs.
|
||||
type Stream struct {
|
||||
broker *Broker
|
||||
w *streamWriter
|
||||
subscriptions map[string]*subscription
|
||||
name string
|
||||
broker *Broker
|
||||
writer *streamWriter
|
||||
|
||||
topics map[string]uint64 // current index for each subscribed topic
|
||||
}
|
||||
|
||||
// Subscribe subscribes the stream to a given topic.
|
||||
func (s *Stream) Subscribe(topic string, index uint64) error {
|
||||
// TODO: Add Subscribe command to the log.
|
||||
// TODO: Wait until applied.
|
||||
return nil
|
||||
}
|
||||
// Subscribe adds a subscription to a topic for the stream.
|
||||
func (s *Stream) Subscribe(topic string) error { return s.broker.Subscribe(s.name, topic) }
|
||||
|
||||
// Unsubscribe removes a subscription from the stream to a given topic.
|
||||
func (s *Stream) Unsubscribe(topic string) error {
|
||||
// TODO: Add Unsubscribe command to the log.
|
||||
// TODO: Wait until applied.
|
||||
return nil
|
||||
}
|
||||
// Unsubscribe removes a subscription from the stream.
|
||||
func (s *Stream) Unsubscribe(topic string) error { return s.broker.Unsubscribe(s.name, topic) }
|
||||
|
||||
// WriteTo begins writing messages to a named stream.
|
||||
// Only one writer is allowed on a stream at a time.
|
||||
func (s *Stream) WriteTo(w io.Writer) (int, error) {
|
||||
writer := newStreamWriter(w)
|
||||
|
||||
// Close existing writer on stream.
|
||||
if s.w != nil {
|
||||
s.w.Close()
|
||||
s.w = nil
|
||||
if s.writer != nil {
|
||||
s.writer.Close()
|
||||
s.writer = nil
|
||||
}
|
||||
|
||||
// Set a new writer on the stream.
|
||||
s.w = &streamWriter{w: w, done: make(chan struct{})}
|
||||
s.writer = writer
|
||||
|
||||
// TODO: Return bytes written.
|
||||
// Create a topic list with the "config" topic first.
|
||||
names := make([]string, 1, len(s.topics))
|
||||
names[0] = configTopicName
|
||||
for name := range s.topics {
|
||||
if name != configTopicName {
|
||||
names = append(names, name)
|
||||
}
|
||||
}
|
||||
sort.Strings(names[1:])
|
||||
|
||||
// Catch up and attach writer to all subscribed topics.
|
||||
for _, name := range names {
|
||||
t := s.broker.topicsByName[name]
|
||||
assert(t != nil, "subscription topic missing: %s", name)
|
||||
if err := t.addWriter(s.name, writer, s.topics[name]); err != nil {
|
||||
return 0, fmt.Errorf("add stream writer: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for writer to close and return.
|
||||
<-writer.done
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// streamWriter represents a writer that blocks Stream.WriteTo until it's closed.
|
||||
type streamWriter struct {
|
||||
w io.Writer
|
||||
io.Writer
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// newStreamWriter returns an instance of streamWriter that wraps an io.Writer.
|
||||
func newStreamWriter(w io.Writer) *streamWriter {
|
||||
return &streamWriter{Writer: w, done: make(chan struct{})}
|
||||
}
|
||||
|
||||
// Close closes the writer.
|
||||
func (w *streamWriter) Close() error {
|
||||
if w.done != nil {
|
||||
|
@ -324,10 +700,45 @@ func (w *streamWriter) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// subscription represents a single topic subscription for a stream.
|
||||
type subscription struct {
|
||||
name string // topic name
|
||||
index uint64 // highest index received
|
||||
// CreateTopicCommand creates a new named topic.
|
||||
type CreateTopicCommand struct {
|
||||
ID uint32 `json:"id"` // topic id
|
||||
Name string `json:"name"` // topic name
|
||||
}
|
||||
|
||||
// DeleteTopicCommand removes a topic by ID.
|
||||
type DeleteTopicCommand struct {
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
// CreateStream creates a new named stream.
|
||||
type CreateStreamCommand struct {
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
// DeleteStreamCommand removes a stream by name.
|
||||
type DeleteStreamCommand struct {
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
// SubscribeCommand subscribes a stream to a new topic.
|
||||
type SubscribeCommand struct {
|
||||
Stream string `json:"stream"` // stream name
|
||||
Topic string `json:"topic"` // topic name
|
||||
}
|
||||
|
||||
// UnsubscribeCommand removes a subscription for a topic from a stream.
|
||||
type UnsubscribeCommand struct {
|
||||
Stream string `json:"stream"` // stream name
|
||||
Topic string `json:"topic"` // topic name
|
||||
}
|
||||
|
||||
// jsonify marshals a value to a JSON-encoded byte slice.
|
||||
// This should only be used with internal data that will not return marshal errors.
|
||||
func jsonify(v interface{}) []byte {
|
||||
b, err := json.Marshal(v)
|
||||
assert(err == nil, "json marshal error: %s", err)
|
||||
return b
|
||||
}
|
||||
|
||||
// assert will panic with a given formatted message if the given condition is false.
|
||||
|
@ -336,3 +747,6 @@ func assert(condition bool, msg string, v ...interface{}) {
|
|||
panic(fmt.Sprintf("assert failed: "+msg, v...))
|
||||
}
|
||||
}
|
||||
|
||||
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
|
||||
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }
|
||||
|
|
|
@ -2,6 +2,7 @@ package broker_test
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"reflect"
|
||||
|
@ -11,48 +12,77 @@ import (
|
|||
"github.com/influxdb/influxdb/broker"
|
||||
)
|
||||
|
||||
// Ensure that opening a broker without a path returns an error.
|
||||
func TestBroker_Open_ErrPathRequired(t *testing.T) {
|
||||
b := broker.New()
|
||||
if err := b.Open(""); err != broker.ErrPathRequired {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that closing an already closed broker returns an error.
|
||||
func TestBroker_Close_ErrClosed(t *testing.T) {
|
||||
b := NewBroker()
|
||||
b.Close()
|
||||
if err := b.Broker.Close(); err != broker.ErrClosed {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the broker can write messages to the appropriate topics.
|
||||
func TestBroker_Write(t *testing.T) {
|
||||
b := NewBroker()
|
||||
defer b.Close()
|
||||
|
||||
// Create a topic.
|
||||
if err := b.CreateTopic("foo/bar"); err != nil {
|
||||
t.Fatalf("create topic error: %s", err)
|
||||
}
|
||||
|
||||
// Create a new named stream.
|
||||
if err := b.CreateStream("node0"); err != nil {
|
||||
t.Fatalf("create stream: %s", err)
|
||||
}
|
||||
|
||||
// Subscribe stream to the foo/bar topic.
|
||||
if err := b.Subscribe("node0", "foo/bar"); err != nil {
|
||||
t.Fatalf("subscribe: %s", err)
|
||||
}
|
||||
|
||||
// Write a message to the broker.
|
||||
index, err := b.Publish("foo/bar", &broker.Message{Type: 100, Data: []byte("0000")})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if index != 2 {
|
||||
} else if index != 5 {
|
||||
t.Fatalf("unexpected index: %d", index)
|
||||
}
|
||||
if err := b.Wait(index); err != nil {
|
||||
t.Fatalf("wait error: %s", err)
|
||||
}
|
||||
|
||||
// Create a new named stream and subscription.
|
||||
if err := b.CreateStream("node0"); err != nil {
|
||||
t.Fatalf("create stream: %s", err)
|
||||
}
|
||||
|
||||
// Retrieve stream and subscribe.
|
||||
s := b.Stream("node0")
|
||||
if err := s.Subscribe("foo/bar", 0); err != nil {
|
||||
t.Fatalf("subscribe: %s", err)
|
||||
}
|
||||
|
||||
// Read message from the stream.
|
||||
var buf bytes.Buffer
|
||||
go func() {
|
||||
if _, err := s.WriteTo(&buf); err != nil {
|
||||
if _, err := b.Stream("node0").WriteTo(&buf); err != nil {
|
||||
t.Fatalf("write to: %s", err)
|
||||
}
|
||||
}()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// Read out the message.
|
||||
// Read out the config messages first.
|
||||
var m broker.Message
|
||||
dec := broker.NewMessageDecoder(&buf)
|
||||
if err := dec.Decode(&m); err != nil || m.Type != broker.CreateStreamMessageType {
|
||||
t.Fatalf("decode(create stream): %x (%v)", m.Type, err)
|
||||
}
|
||||
if err := dec.Decode(&m); err != nil || m.Type != broker.SubscribeMessageType {
|
||||
t.Fatalf("decode(subscribe): %x (%v)", m.Type, err)
|
||||
}
|
||||
|
||||
// Read out the published message.
|
||||
if err := dec.Decode(&m); err != nil {
|
||||
t.Fatalf("decode: %s", err)
|
||||
} else if !reflect.DeepEqual(&m, &broker.Message{Type: 100, Index: 2, Data: []byte("0000")}) {
|
||||
} else if !reflect.DeepEqual(&m, &broker.Message{Type: 100, Index: 5, Data: []byte("0000")}) {
|
||||
t.Fatalf("unexpected message: %#v", &m)
|
||||
}
|
||||
}
|
||||
|
@ -88,3 +118,6 @@ func tempfile() string {
|
|||
os.Remove(path)
|
||||
return path
|
||||
}
|
||||
|
||||
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
|
||||
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }
|
||||
|
|
|
@ -3,6 +3,12 @@ package broker
|
|||
import "errors"
|
||||
|
||||
var (
|
||||
// ErrPathRequired is returned when opening a broker without a path.
|
||||
ErrPathRequired = errors.New("path required")
|
||||
|
||||
// ErrClosed is returned when closing a broker that's already closed.
|
||||
ErrClosed = errors.New("broker already closed")
|
||||
|
||||
// ErrSubscribed is returned when a stream is already subscribed to a topic.
|
||||
ErrSubscribed = errors.New("already subscribed")
|
||||
)
|
||||
|
|
|
@ -9,11 +9,18 @@ import (
|
|||
type MessageType uint16
|
||||
|
||||
const (
|
||||
BrokerMessageType = 1 << 15
|
||||
ConfigMessageType = 1 << 15
|
||||
)
|
||||
|
||||
const (
|
||||
createStreamMessageType = BrokerMessageType | MessageType(0)
|
||||
CreateTopicMessageType = ConfigMessageType | MessageType(0x00)
|
||||
DeleteTopicMessageType = ConfigMessageType | MessageType(0x01)
|
||||
|
||||
CreateStreamMessageType = ConfigMessageType | MessageType(0x10)
|
||||
DeleteStreamMessageType = ConfigMessageType | MessageType(0x11)
|
||||
|
||||
SubscribeMessageType = ConfigMessageType | MessageType(0x20)
|
||||
UnsubscribeMessageType = ConfigMessageType | MessageType(0x21)
|
||||
)
|
||||
|
||||
// Message represents a single item in a topic.
|
||||
|
@ -23,42 +30,29 @@ type Message struct {
|
|||
Data []byte
|
||||
}
|
||||
|
||||
// createStream creates a new named stream.
|
||||
type createStream struct {
|
||||
Name string `json:"name"`
|
||||
// WriteTo encodes and writes the message to a writer. Implements io.WriterTo.
|
||||
func (m *Message) WriteTo(w io.Writer) (n int, err error) {
|
||||
if n, err := w.Write(m.header()); err != nil {
|
||||
return n, err
|
||||
}
|
||||
if n, err := w.Write(m.Data); err != nil {
|
||||
return messageHeaderSize + n, err
|
||||
}
|
||||
return messageHeaderSize + len(m.Data), nil
|
||||
}
|
||||
|
||||
// header returns a byte slice with the message header.
|
||||
func (m *Message) header() []byte {
|
||||
b := make([]byte, messageHeaderSize)
|
||||
binary.BigEndian.PutUint16(b[0:2], uint16(m.Type))
|
||||
binary.BigEndian.PutUint64(b[2:10], m.Index)
|
||||
binary.BigEndian.PutUint32(b[10:14], uint32(len(m.Data)))
|
||||
return b
|
||||
}
|
||||
|
||||
// The size of the encoded message header, in bytes.
|
||||
const messageHeaderSize = 2 + 8 + 4
|
||||
|
||||
// MessageEncoder encodes messages to a writer.
|
||||
type MessageEncoder struct {
|
||||
w io.Writer
|
||||
}
|
||||
|
||||
// NewMessageEncoder returns a new instance of the MessageEncoder.
|
||||
func NewMessageEncoder(w io.Writer) *MessageEncoder {
|
||||
return &MessageEncoder{w: w}
|
||||
}
|
||||
|
||||
// Encode writes a message to the encoder's writer.
|
||||
func (enc *MessageEncoder) Encode(m *Message) error {
|
||||
// Generate and write message header.
|
||||
var b [messageHeaderSize]byte
|
||||
binary.BigEndian.PutUint16(b[0:2], uint16(m.Type))
|
||||
binary.BigEndian.PutUint64(b[2:10], m.Index)
|
||||
binary.BigEndian.PutUint32(b[10:14], uint32(len(m.Data)))
|
||||
if _, err := enc.w.Write(b[:]); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write data.
|
||||
if _, err := enc.w.Write(m.Data); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// MessageDecoder decodes messages from a reader.
|
||||
type MessageDecoder struct {
|
||||
r io.Reader
|
||||
|
|
Loading…
Reference in New Issue