diff --git a/broker/broker.go b/broker/broker.go index 19cc78cb24..3c229b1eff 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -2,6 +2,7 @@ package broker import ( "encoding/binary" + "encoding/json" "fmt" "io" "os" @@ -17,15 +18,17 @@ type Broker struct { mu sync.Mutex path string // data directory - log *raft.Log // internal raft log - topics map[string]*topic // topic writers by path + log *raft.Log // internal raft log + topics map[string]*topic // topic writers by path + streams map[string]*Stream // stream by name } // New returns a new instance of a Broker with default values. func New() *Broker { b := &Broker{ - log: raft.NewLog(), - topics: make(map[string]*topic), + log: raft.NewLog(), + topics: make(map[string]*topic), + streams: make(map[string]*Stream), } b.log.FSM = (*brokerFSM)(b) return b @@ -107,6 +110,35 @@ func (b *Broker) Wait(index uint64) error { return b.log.Wait(index) } +// Stream returns a stream by name. +func (b *Broker) Stream(name string) *Stream { + return b.streams[name] +} + +// CreateStream creates a new named stream. +func (b *Broker) CreateStream(name string) error { + // Create message. + var m Message + m.Type = createStreamMessageType + m.Data, _ = json.Marshal(&createStream{Name: name}) + + // Write to the log. + index, err := b.Publish("config", &m) + if err != nil { + return err + } + + // Wait until applied. + return b.log.Wait(index) +} + +// RemoveStream deletes an existing stream by name. +func (b *Broker) RemoveStream(name string) error { + // TODO: Add DeleteStream command to the log. + // TODO: Wait until applied. + return nil +} + // Returns the topic by name. Creates it if it doesn't exist. func (b *Broker) createTopicIfNotExists(name string) (*topic, error) { // Return it if it already exists. @@ -164,6 +196,16 @@ func (fsm *brokerFSM) Apply(e *raft.LogEntry) error { m.Index = e.Index m.Data = e.Data[3+sz:] + // Update the broker configuration. + var err error + switch m.Type { + case createStreamMessageType: + err = fsm.applyCreateStream(&m) + } + if err != nil { + return err + } + // Retrieve the topic. t, err := b.createTopicIfNotExists(topic) if err != nil { @@ -178,6 +220,21 @@ func (fsm *brokerFSM) Apply(e *raft.LogEntry) error { return nil } +// applyCreateStream processes a createStream message. +func (fsm *brokerFSM) applyCreateStream(m *Message) error { + var c createStream + if err := json.Unmarshal(m.Data, &c); err != nil { + return err + } + + // Create a new named stream. + fsm.streams[c.Name] = &Stream{ + broker: (*Broker)(fsm), + subscriptions: make(map[string]*subscription), + } + return nil +} + // Index returns the highest index that the broker has seen. func (fsm *brokerFSM) Index() (uint64, error) { // TODO: Retrieve index. @@ -214,6 +271,65 @@ func (t *topic) writeMessage(m *Message) error { return nil } +// Stream represents a collection of subscriptions to topics on the broker. +// The stream maintains the highest index read for each topic so that the +// broker can use this high water mark for trimming the topic logs. +type Stream struct { + broker *Broker + w *streamWriter + subscriptions map[string]*subscription +} + +// Subscribe subscribes the stream to a given topic. +func (s *Stream) Subscribe(topic string, index uint64) error { + // TODO: Add Subscribe command to the log. + // TODO: Wait until applied. + return nil +} + +// Unsubscribe removes a subscription from the stream to a given topic. +func (s *Stream) Unsubscribe(topic string) error { + // TODO: Add Unsubscribe command to the log. + // TODO: Wait until applied. + return nil +} + +// WriteTo begins writing messages to a named stream. +// Only one writer is allowed on a stream at a time. +func (s *Stream) WriteTo(w io.Writer) (int, error) { + // Close existing writer on stream. + if s.w != nil { + s.w.Close() + s.w = nil + } + + // Set a new writer on the stream. + s.w = &streamWriter{w: w, done: make(chan struct{})} + + // TODO: Return bytes written. + return 0, nil +} + +type streamWriter struct { + w io.Writer + done chan struct{} +} + +// Close closes the writer. +func (w *streamWriter) Close() error { + if w.done != nil { + close(w.done) + w.done = nil + } + return nil +} + +// subscription represents a single topic subscription for a stream. +type subscription struct { + name string // topic name + index uint64 // highest index received +} + // assert will panic with a given formatted message if the given condition is false. func assert(condition bool, msg string, v ...interface{}) { if !condition { diff --git a/broker/broker_test.go b/broker/broker_test.go index f33f204a35..04894a775e 100644 --- a/broker/broker_test.go +++ b/broker/broker_test.go @@ -1,9 +1,12 @@ package broker_test import ( + "bytes" "io/ioutil" "os" + "reflect" "testing" + "time" "github.com/influxdb/influxdb/broker" ) @@ -24,7 +27,34 @@ func TestBroker_Write(t *testing.T) { t.Fatalf("wait error: %s", err) } - // TODO: Read the message back from the broker. + // Create a new named stream and subscription. + if err := b.CreateStream("node0"); err != nil { + t.Fatalf("create stream: %s", err) + } + + // Retrieve stream and subscribe. + s := b.Stream("node0") + if err := s.Subscribe("foo/bar", 0); err != nil { + t.Fatalf("subscribe: %s", err) + } + + // Read message from the stream. + var buf bytes.Buffer + go func() { + if _, err := s.WriteTo(&buf); err != nil { + t.Fatalf("write to: %s", err) + } + }() + time.Sleep(10 * time.Millisecond) + + // Read out the message. + var m broker.Message + dec := broker.NewMessageDecoder(&buf) + if err := dec.Decode(&m); err != nil { + t.Fatalf("decode: %s", err) + } else if !reflect.DeepEqual(&m, &broker.Message{Type: 100, Index: 2, Data: []byte("0000")}) { + t.Fatalf("unexpected message: %#v", &m) + } } // Broker is a wrapper for broker.Broker that creates the broker in a temporary location. diff --git a/broker/errors.go b/broker/errors.go new file mode 100644 index 0000000000..0451085203 --- /dev/null +++ b/broker/errors.go @@ -0,0 +1,8 @@ +package broker + +import "errors" + +var ( + // ErrSubscribed is returned when a stream is already subscribed to a topic. + ErrSubscribed = errors.New("already subscribed") +) diff --git a/broker/message.go b/broker/message.go index 8fe2af4dae..bc27faa1cb 100644 --- a/broker/message.go +++ b/broker/message.go @@ -5,12 +5,17 @@ import ( "io" ) -// The size of the encoded message header, in bytes. -const messageHeaderSize = 2 + 8 + 4 - // MessageType represents the type of message. type MessageType uint16 +const ( + BrokerMessageType = 1 << 15 +) + +const ( + createStreamMessageType = BrokerMessageType | MessageType(0) +) + // Message represents a single item in a topic. type Message struct { Type MessageType @@ -18,6 +23,14 @@ type Message struct { Data []byte } +// createStream creates a new named stream. +type createStream struct { + Name string `json:"name"` +} + +// The size of the encoded message header, in bytes. +const messageHeaderSize = 2 + 8 + 4 + // MessageEncoder encodes messages to a writer. type MessageEncoder struct { w io.Writer