Broker/stream refactoring.

pull/903/head
Ben Johnson 2014-10-04 11:27:12 -06:00
parent b2eb9f9190
commit 37b11da4e8
4 changed files with 175 additions and 8 deletions

View File

@ -2,6 +2,7 @@ package broker
import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
"os"
@ -17,15 +18,17 @@ type Broker struct {
mu sync.Mutex
path string // data directory
log *raft.Log // internal raft log
topics map[string]*topic // topic writers by path
log *raft.Log // internal raft log
topics map[string]*topic // topic writers by path
streams map[string]*Stream // stream by name
}
// New returns a new instance of a Broker with default values.
func New() *Broker {
b := &Broker{
log: raft.NewLog(),
topics: make(map[string]*topic),
log: raft.NewLog(),
topics: make(map[string]*topic),
streams: make(map[string]*Stream),
}
b.log.FSM = (*brokerFSM)(b)
return b
@ -107,6 +110,35 @@ func (b *Broker) Wait(index uint64) error {
return b.log.Wait(index)
}
// Stream returns a stream by name.
func (b *Broker) Stream(name string) *Stream {
return b.streams[name]
}
// CreateStream creates a new named stream.
func (b *Broker) CreateStream(name string) error {
// Create message.
var m Message
m.Type = createStreamMessageType
m.Data, _ = json.Marshal(&createStream{Name: name})
// Write to the log.
index, err := b.Publish("config", &m)
if err != nil {
return err
}
// Wait until applied.
return b.log.Wait(index)
}
// RemoveStream deletes an existing stream by name.
func (b *Broker) RemoveStream(name string) error {
// TODO: Add DeleteStream command to the log.
// TODO: Wait until applied.
return nil
}
// Returns the topic by name. Creates it if it doesn't exist.
func (b *Broker) createTopicIfNotExists(name string) (*topic, error) {
// Return it if it already exists.
@ -164,6 +196,16 @@ func (fsm *brokerFSM) Apply(e *raft.LogEntry) error {
m.Index = e.Index
m.Data = e.Data[3+sz:]
// Update the broker configuration.
var err error
switch m.Type {
case createStreamMessageType:
err = fsm.applyCreateStream(&m)
}
if err != nil {
return err
}
// Retrieve the topic.
t, err := b.createTopicIfNotExists(topic)
if err != nil {
@ -178,6 +220,21 @@ func (fsm *brokerFSM) Apply(e *raft.LogEntry) error {
return nil
}
// applyCreateStream processes a createStream message.
func (fsm *brokerFSM) applyCreateStream(m *Message) error {
var c createStream
if err := json.Unmarshal(m.Data, &c); err != nil {
return err
}
// Create a new named stream.
fsm.streams[c.Name] = &Stream{
broker: (*Broker)(fsm),
subscriptions: make(map[string]*subscription),
}
return nil
}
// Index returns the highest index that the broker has seen.
func (fsm *brokerFSM) Index() (uint64, error) {
// TODO: Retrieve index.
@ -214,6 +271,65 @@ func (t *topic) writeMessage(m *Message) error {
return nil
}
// Stream represents a collection of subscriptions to topics on the broker.
// The stream maintains the highest index read for each topic so that the
// broker can use this high water mark for trimming the topic logs.
type Stream struct {
broker *Broker
w *streamWriter
subscriptions map[string]*subscription
}
// Subscribe subscribes the stream to a given topic.
func (s *Stream) Subscribe(topic string, index uint64) error {
// TODO: Add Subscribe command to the log.
// TODO: Wait until applied.
return nil
}
// Unsubscribe removes a subscription from the stream to a given topic.
func (s *Stream) Unsubscribe(topic string) error {
// TODO: Add Unsubscribe command to the log.
// TODO: Wait until applied.
return nil
}
// WriteTo begins writing messages to a named stream.
// Only one writer is allowed on a stream at a time.
func (s *Stream) WriteTo(w io.Writer) (int, error) {
// Close existing writer on stream.
if s.w != nil {
s.w.Close()
s.w = nil
}
// Set a new writer on the stream.
s.w = &streamWriter{w: w, done: make(chan struct{})}
// TODO: Return bytes written.
return 0, nil
}
type streamWriter struct {
w io.Writer
done chan struct{}
}
// Close closes the writer.
func (w *streamWriter) Close() error {
if w.done != nil {
close(w.done)
w.done = nil
}
return nil
}
// subscription represents a single topic subscription for a stream.
type subscription struct {
name string // topic name
index uint64 // highest index received
}
// assert will panic with a given formatted message if the given condition is false.
func assert(condition bool, msg string, v ...interface{}) {
if !condition {

View File

@ -1,9 +1,12 @@
package broker_test
import (
"bytes"
"io/ioutil"
"os"
"reflect"
"testing"
"time"
"github.com/influxdb/influxdb/broker"
)
@ -24,7 +27,34 @@ func TestBroker_Write(t *testing.T) {
t.Fatalf("wait error: %s", err)
}
// TODO: Read the message back from the broker.
// Create a new named stream and subscription.
if err := b.CreateStream("node0"); err != nil {
t.Fatalf("create stream: %s", err)
}
// Retrieve stream and subscribe.
s := b.Stream("node0")
if err := s.Subscribe("foo/bar", 0); err != nil {
t.Fatalf("subscribe: %s", err)
}
// Read message from the stream.
var buf bytes.Buffer
go func() {
if _, err := s.WriteTo(&buf); err != nil {
t.Fatalf("write to: %s", err)
}
}()
time.Sleep(10 * time.Millisecond)
// Read out the message.
var m broker.Message
dec := broker.NewMessageDecoder(&buf)
if err := dec.Decode(&m); err != nil {
t.Fatalf("decode: %s", err)
} else if !reflect.DeepEqual(&m, &broker.Message{Type: 100, Index: 2, Data: []byte("0000")}) {
t.Fatalf("unexpected message: %#v", &m)
}
}
// Broker is a wrapper for broker.Broker that creates the broker in a temporary location.

8
broker/errors.go Normal file
View File

@ -0,0 +1,8 @@
package broker
import "errors"
var (
// ErrSubscribed is returned when a stream is already subscribed to a topic.
ErrSubscribed = errors.New("already subscribed")
)

View File

@ -5,12 +5,17 @@ import (
"io"
)
// The size of the encoded message header, in bytes.
const messageHeaderSize = 2 + 8 + 4
// MessageType represents the type of message.
type MessageType uint16
const (
BrokerMessageType = 1 << 15
)
const (
createStreamMessageType = BrokerMessageType | MessageType(0)
)
// Message represents a single item in a topic.
type Message struct {
Type MessageType
@ -18,6 +23,14 @@ type Message struct {
Data []byte
}
// createStream creates a new named stream.
type createStream struct {
Name string `json:"name"`
}
// The size of the encoded message header, in bytes.
const messageHeaderSize = 2 + 8 + 4
// MessageEncoder encodes messages to a writer.
type MessageEncoder struct {
w io.Writer