Refactor topic/message format.

pull/903/head
Ben Johnson 2014-10-17 09:53:10 -06:00
parent 14fd40cdb5
commit 7dbfc11b85
5 changed files with 170 additions and 77 deletions

View File

@ -3,9 +3,10 @@ Broker
## Uncompleted
- [ ] HTTP Handler
- [ ] Broker client
- [ ] Cluster configuration integration
- [ ] Broker FSM snapshotting
- [ ] HTTP Handler
- [ ] Replica heartbeats
- [ ] Segment topic files.
- [ ] Topic truncation
@ -23,3 +24,4 @@ Broker
- [x] Config topic
- [x] Stream topic from index
- [x] Test coverage
- [x] Move topic id into message.

View File

@ -118,14 +118,18 @@ func (b *Broker) Publish(topic string, m *Message) (uint64, error) {
return 0, ErrTopicNotFound
}
return b.log.Apply(encodeTopicMessage(t.id, m))
// Attach topic id to the message and encode.
m.TopicID = t.id
buf, _ := m.MarshalBinary()
return b.log.Apply(buf)
}
// publishConfig writes a configuration change to the config topic.
// This always waits until the change is applied to the log.
func (b *Broker) publishConfig(m *Message) error {
// Encode topic, type, and data together.
buf := encodeTopicMessage(configTopicID, m)
m.TopicID = configTopicID
buf, _ := m.MarshalBinary()
// Apply to the raft log.
index, err := b.log.Apply(buf)
@ -393,13 +397,17 @@ func (fsm *brokerFSM) Apply(e *raft.LogEntry) error {
return nil
}
// Decode the topic message from the raft log.
topicID, m := decodeTopicMessage(e.Data)
// Decode the message from the raft log.
m := &Message{}
err := m.UnmarshalBinary(e.Data)
assert(err == nil, "message unmarshal: %s", err)
// Add the raft index to the message.
m.Index = e.Index
// Find topic by id. Ignore topic if it doesn't exist.
t := b.topics[topicID]
assert(t != nil, "topic not found: %d", topicID)
t := b.topics[m.TopicID]
assert(t != nil, "topic not found: %d", m.TopicID)
// Update the broker configuration.
switch m.Type {
@ -470,25 +478,6 @@ func (fsm *brokerFSM) Restore(r io.Reader) error {
return nil
}
// encodes a topic id and message together for the raft log.
func encodeTopicMessage(topicID uint32, m *Message) []byte {
b := make([]byte, 4+2+len(m.Data))
binary.BigEndian.PutUint32(b, topicID)
binary.BigEndian.PutUint16(b[4:6], uint16(m.Type))
copy(b[6:], m.Data)
return b
}
// decodes a topic id and message together from the raft log.
func decodeTopicMessage(b []byte) (topicID uint32, m *Message) {
topicID = binary.BigEndian.Uint32(b[0:4])
m = &Message{
Type: MessageType(binary.BigEndian.Uint16(b[4:6])),
Data: b[6:],
}
return
}
// topic represents a single named queue of messages.
// Each topic is identified by a unique path.
type topic struct {
@ -585,7 +574,7 @@ func (t *topic) encode(m *Message) error {
// Encode message.
b := make([]byte, messageHeaderSize+len(m.Data))
copy(b, m.header())
copy(b, m.marshalHeader())
copy(b[messageHeaderSize:], m.Data)
// Write to topic file.
@ -762,18 +751,19 @@ const (
)
// The size of the encoded message header, in bytes.
const messageHeaderSize = 2 + 8 + 4
const messageHeaderSize = 2 + 4 + 8 + 4
// Message represents a single item in a topic.
type Message struct {
Type MessageType
Index uint64
Data []byte
Type MessageType
TopicID uint32
Index uint64
Data []byte
}
// WriteTo encodes and writes the message to a writer. Implements io.WriterTo.
func (m *Message) WriteTo(w io.Writer) (n int, err error) {
if n, err := w.Write(m.header()); err != nil {
if n, err := w.Write(m.marshalHeader()); err != nil {
return n, err
}
if n, err := w.Write(m.Data); err != nil {
@ -782,15 +772,45 @@ func (m *Message) WriteTo(w io.Writer) (n int, err error) {
return messageHeaderSize + len(m.Data), nil
}
// header returns a byte slice with the message header.
func (m *Message) header() []byte {
// MarshalBinary returns a binary representation of the message.
// This implements encoding.BinaryMarshaler. An error cannot be returned.
func (m *Message) MarshalBinary() ([]byte, error) {
b := make([]byte, messageHeaderSize+len(m.Data))
copy(b, m.marshalHeader())
copy(b[messageHeaderSize:], m.Data)
return b, nil
}
// UnmarshalBinary reads a message from a binary encoded slice.
// This implements encoding.BinaryUnmarshaler.
func (m *Message) UnmarshalBinary(b []byte) error {
m.unmarshalHeader(b)
if len(b[messageHeaderSize:]) < len(m.Data) {
return fmt.Errorf("message data too short: %d < %d", len(b[messageHeaderSize:]), len(m.Data))
}
copy(m.Data, b[messageHeaderSize:])
return nil
}
// marshalHeader returns a byte slice with the message header.
func (m *Message) marshalHeader() []byte {
b := make([]byte, messageHeaderSize)
binary.BigEndian.PutUint16(b[0:2], uint16(m.Type))
binary.BigEndian.PutUint64(b[2:10], m.Index)
binary.BigEndian.PutUint32(b[10:14], uint32(len(m.Data)))
binary.BigEndian.PutUint32(b[2:6], m.TopicID)
binary.BigEndian.PutUint64(b[6:14], m.Index)
binary.BigEndian.PutUint32(b[14:18], uint32(len(m.Data)))
return b
}
// unmarshalHeader reads message header data from binary encoded slice.
// The data field is appropriately sized but is not filled.
func (m *Message) unmarshalHeader(b []byte) {
m.Type = MessageType(binary.BigEndian.Uint16(b[0:2]))
m.TopicID = binary.BigEndian.Uint32(b[2:6])
m.Index = binary.BigEndian.Uint64(b[6:14])
m.Data = make([]byte, binary.BigEndian.Uint32(b[14:18]))
}
// MessageDecoder decodes messages from a reader.
type MessageDecoder struct {
r io.Reader
@ -803,17 +823,15 @@ func NewMessageDecoder(r io.Reader) *MessageDecoder {
// Decode reads a message from the decoder's reader.
func (dec *MessageDecoder) Decode(m *Message) error {
// Read header bytes.
var b [messageHeaderSize]byte
if _, err := io.ReadFull(dec.r, b[:]); err != nil {
return err
}
m.Type = MessageType(binary.BigEndian.Uint16(b[0:2]))
m.Index = binary.BigEndian.Uint64(b[2:10])
m.Data = make([]byte, binary.BigEndian.Uint32(b[10:14]))
m.unmarshalHeader(b[:])
// Read data.
if n, err := io.ReadFull(dec.r, m.Data); err != nil {
warn("io.2", n, len(m.Data), err)
if _, err := io.ReadFull(dec.r, m.Data); err != nil {
return err
}

View File

@ -94,7 +94,7 @@ func TestBroker_Publish(t *testing.T) {
// Read out the published message.
if err := dec.Decode(&m); err != nil {
t.Fatalf("decode: %s", err)
} else if !reflect.DeepEqual(&m, &broker.Message{Type: 100, Index: 5, Data: []byte("0000")}) {
} else if !reflect.DeepEqual(&m, &broker.Message{Type: 100, TopicID: 1, Index: 5, Data: []byte("0000")}) {
t.Fatalf("unexpected message: %#v", &m)
}
@ -299,9 +299,3 @@ func tempfile() string {
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }
func ok(err error) {
if err != nil {
panic("unexpected error")
}
}

View File

@ -9,8 +9,9 @@ import (
"time"
)
// ReconnectTimeout is the time to wait between stream disconnects before retrying.
const ReconnectTimeout = 100 * time.Millisecond
// DefaultReconnectTimeout is the default time to wait between when a broker
// stream disconnects and another connection is retried.
const DefaultReconnectTimeout = 100 * time.Millisecond
// Client represents a client for the broker's HTTP API.
// Once opened, the client will stream down all messages that
@ -20,18 +21,22 @@ type Client struct {
urls []*url.URL // list of URLs for all known brokers.
opened bool
done chan struct{} // disconnection notification
done chan chan struct{} // disconnection notification
// Channel streams messages from the broker.
// Messages can be duplicated so it is important to check the index
// of the incoming message index to make sure it has not been processed.
C chan *Message
// The amount of time to wait before reconnecting to a broker stream.
ReconnectTimeout time.Duration
}
// NewClient returns a new instance of Client.
func NewClient(name string) *Client {
return &Client{
name: name,
name: name,
ReconnectTimeout: DefaultReconnectTimeout,
}
}
@ -65,7 +70,7 @@ func (c *Client) Open(urls []*url.URL) error {
c.C = make(chan *Message, 0)
// Open the streamer.
c.done = make(chan struct{})
c.done = make(chan chan struct{})
go c.streamer(c.done)
// Set open flag.
@ -84,14 +89,16 @@ func (c *Client) Close() error {
return ErrClientClosed
}
// Shutdown streamer.
ch := make(chan struct{})
c.done <- ch
<-ch
c.done = nil
// Close message stream.
close(c.C)
c.C = nil
// Shutdown streamer.
close(c.done)
c.done = nil
// Unset open flag.
c.opened = false
@ -99,11 +106,12 @@ func (c *Client) Close() error {
}
// streamer connects to a broker server and streams the replica's messages.
func (c *Client) streamer(done chan struct{}) {
func (c *Client) streamer(done chan chan struct{}) {
for {
// Check for the client disconnection.
select {
case <-done:
case ch := <-done:
close(ch)
return
default:
}
@ -123,38 +131,57 @@ func (c *Client) streamer(done chan struct{}) {
}
// streamFromURL connects to a broker server and streams the replica's messages.
func (c *Client) streamFromURL(u *url.URL, done chan struct{}) error {
func (c *Client) streamFromURL(u *url.URL, done chan chan struct{}) error {
// Set the replica name on the URL and open the stream.
u.RawQuery = url.Values{"name": {c.name}}.Encode()
resp, err := http.Get(u.String())
if err != nil {
time.Sleep(ReconnectTimeout)
time.Sleep(c.ReconnectTimeout)
return nil
}
defer func() { _ = resp.Body.Close() }()
// Ensure that we received a 200 OK from the server before streaming.
if resp.StatusCode != http.StatusOK {
warn("status:", resp.StatusCode)
time.Sleep(c.ReconnectTimeout)
return nil
}
// Continuously decode messages from request body.
dec := NewMessageDecoder(resp.Body)
for {
// Decode message from the stream.
m := &Message{}
if err := dec.Decode(m); err != nil {
return err
// Continuously decode messages from request body in a separate goroutine.
errNotify := make(chan error, 0)
go func() {
dec := NewMessageDecoder(resp.Body)
for {
// Decode message from the stream.
m := &Message{}
if err := dec.Decode(m); err != nil {
errNotify <- err
return
}
// Write message to streaming channel.
c.C <- m
}
}()
// Send message to channel.
c.C <- m
// Check for the client disconnect or error from the stream.
select {
case ch := <-done:
// Close body.
resp.Body.Close()
// Check for notification of disconnect.
// Clear message buffer.
select {
case <-done:
return errDone
case <-c.C:
default:
}
// Notify the close function and return marker error.
close(ch)
return errDone
case err := <-errNotify:
return err
}
}

View File

@ -3,18 +3,27 @@ package broker_test
import (
"net/url"
"testing"
"time"
"github.com/influxdb/influxdb/broker"
)
// Ensure the client name can be retrieved.
func TestClient_Name(t *testing.T) {
c := NewClient("node0")
defer c.Close()
if name := c.Name(); name != "node0" {
t.Fatalf("unexpected name: %s", name)
}
}
// Ensure that a client can open a connect to the broker.
func TestClient_Open(t *testing.T) {
c := NewClient("node0")
defer c.Close()
// Create replica on broker.
b := c.Handler.Broker
ok(b.CreateReplica("node0"))
c.Handler.Broker.CreateReplica("node0")
// Open client to broker.
u, _ := url.Parse(c.Handler.HTTPServer.URL)
@ -22,7 +31,7 @@ func TestClient_Open(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
// Receive a set of messages from the stream.
// Receive a message from the stream.
if m := <-c.C; m.Type != broker.CreateReplicaMessageType {
t.Fatalf("unexpected message type: %x", m.Type)
}
@ -33,6 +42,49 @@ func TestClient_Open(t *testing.T) {
}
}
// Ensure that opening an already open client returns an error.
func TestClient_Open_ErrClientOpen(t *testing.T) {
c := NewClient("node0")
defer c.Close()
// Open client to broker.
u, _ := url.Parse(c.Handler.HTTPServer.URL)
c.Open([]*url.URL{u})
if err := c.Open([]*url.URL{u}); err != broker.ErrClientOpen {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that opening a client without a broker URL returns an error.
func TestClient_Open_ErrBrokerURLRequired(t *testing.T) {
c := NewClient("node0")
defer c.Close()
if err := c.Open([]*url.URL{}); err != broker.ErrBrokerURLRequired {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that a client can close while a message is pending.
func TestClient_Close(t *testing.T) {
c := NewClient("node0")
defer c.Close()
// Create replica on broker.
c.Handler.Broker.CreateReplica("node0")
// Open client to broker.
u, _ := url.Parse(c.Handler.HTTPServer.URL)
if err := c.Open([]*url.URL{u}); err != nil {
t.Fatalf("unexpected error: %s", err)
}
time.Sleep(10 * time.Millisecond)
// Close connection to the broker.
if err := c.Client.Close(); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
// Client represents a test wrapper for the broker client.
type Client struct {
*broker.Client