diff --git a/broker/TODO b/broker/TODO index 6f5f72f8e3..d28ec8c00f 100644 --- a/broker/TODO +++ b/broker/TODO @@ -3,9 +3,10 @@ Broker ## Uncompleted +- [ ] HTTP Handler +- [ ] Broker client - [ ] Cluster configuration integration - [ ] Broker FSM snapshotting -- [ ] HTTP Handler - [ ] Replica heartbeats - [ ] Segment topic files. - [ ] Topic truncation @@ -23,3 +24,4 @@ Broker - [x] Config topic - [x] Stream topic from index - [x] Test coverage +- [x] Move topic id into message. diff --git a/broker/broker.go b/broker/broker.go index 7df8450d4e..ad91557719 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -118,14 +118,18 @@ func (b *Broker) Publish(topic string, m *Message) (uint64, error) { return 0, ErrTopicNotFound } - return b.log.Apply(encodeTopicMessage(t.id, m)) + // Attach topic id to the message and encode. + m.TopicID = t.id + buf, _ := m.MarshalBinary() + return b.log.Apply(buf) } // publishConfig writes a configuration change to the config topic. // This always waits until the change is applied to the log. func (b *Broker) publishConfig(m *Message) error { // Encode topic, type, and data together. - buf := encodeTopicMessage(configTopicID, m) + m.TopicID = configTopicID + buf, _ := m.MarshalBinary() // Apply to the raft log. index, err := b.log.Apply(buf) @@ -393,13 +397,17 @@ func (fsm *brokerFSM) Apply(e *raft.LogEntry) error { return nil } - // Decode the topic message from the raft log. - topicID, m := decodeTopicMessage(e.Data) + // Decode the message from the raft log. + m := &Message{} + err := m.UnmarshalBinary(e.Data) + assert(err == nil, "message unmarshal: %s", err) + + // Add the raft index to the message. m.Index = e.Index // Find topic by id. Ignore topic if it doesn't exist. - t := b.topics[topicID] - assert(t != nil, "topic not found: %d", topicID) + t := b.topics[m.TopicID] + assert(t != nil, "topic not found: %d", m.TopicID) // Update the broker configuration. switch m.Type { @@ -470,25 +478,6 @@ func (fsm *brokerFSM) Restore(r io.Reader) error { return nil } -// encodes a topic id and message together for the raft log. -func encodeTopicMessage(topicID uint32, m *Message) []byte { - b := make([]byte, 4+2+len(m.Data)) - binary.BigEndian.PutUint32(b, topicID) - binary.BigEndian.PutUint16(b[4:6], uint16(m.Type)) - copy(b[6:], m.Data) - return b -} - -// decodes a topic id and message together from the raft log. -func decodeTopicMessage(b []byte) (topicID uint32, m *Message) { - topicID = binary.BigEndian.Uint32(b[0:4]) - m = &Message{ - Type: MessageType(binary.BigEndian.Uint16(b[4:6])), - Data: b[6:], - } - return -} - // topic represents a single named queue of messages. // Each topic is identified by a unique path. type topic struct { @@ -585,7 +574,7 @@ func (t *topic) encode(m *Message) error { // Encode message. b := make([]byte, messageHeaderSize+len(m.Data)) - copy(b, m.header()) + copy(b, m.marshalHeader()) copy(b[messageHeaderSize:], m.Data) // Write to topic file. @@ -762,18 +751,19 @@ const ( ) // The size of the encoded message header, in bytes. -const messageHeaderSize = 2 + 8 + 4 +const messageHeaderSize = 2 + 4 + 8 + 4 // Message represents a single item in a topic. type Message struct { - Type MessageType - Index uint64 - Data []byte + Type MessageType + TopicID uint32 + Index uint64 + Data []byte } // WriteTo encodes and writes the message to a writer. Implements io.WriterTo. func (m *Message) WriteTo(w io.Writer) (n int, err error) { - if n, err := w.Write(m.header()); err != nil { + if n, err := w.Write(m.marshalHeader()); err != nil { return n, err } if n, err := w.Write(m.Data); err != nil { @@ -782,15 +772,45 @@ func (m *Message) WriteTo(w io.Writer) (n int, err error) { return messageHeaderSize + len(m.Data), nil } -// header returns a byte slice with the message header. -func (m *Message) header() []byte { +// MarshalBinary returns a binary representation of the message. +// This implements encoding.BinaryMarshaler. An error cannot be returned. +func (m *Message) MarshalBinary() ([]byte, error) { + b := make([]byte, messageHeaderSize+len(m.Data)) + copy(b, m.marshalHeader()) + copy(b[messageHeaderSize:], m.Data) + return b, nil +} + +// UnmarshalBinary reads a message from a binary encoded slice. +// This implements encoding.BinaryUnmarshaler. +func (m *Message) UnmarshalBinary(b []byte) error { + m.unmarshalHeader(b) + if len(b[messageHeaderSize:]) < len(m.Data) { + return fmt.Errorf("message data too short: %d < %d", len(b[messageHeaderSize:]), len(m.Data)) + } + copy(m.Data, b[messageHeaderSize:]) + return nil +} + +// marshalHeader returns a byte slice with the message header. +func (m *Message) marshalHeader() []byte { b := make([]byte, messageHeaderSize) binary.BigEndian.PutUint16(b[0:2], uint16(m.Type)) - binary.BigEndian.PutUint64(b[2:10], m.Index) - binary.BigEndian.PutUint32(b[10:14], uint32(len(m.Data))) + binary.BigEndian.PutUint32(b[2:6], m.TopicID) + binary.BigEndian.PutUint64(b[6:14], m.Index) + binary.BigEndian.PutUint32(b[14:18], uint32(len(m.Data))) return b } +// unmarshalHeader reads message header data from binary encoded slice. +// The data field is appropriately sized but is not filled. +func (m *Message) unmarshalHeader(b []byte) { + m.Type = MessageType(binary.BigEndian.Uint16(b[0:2])) + m.TopicID = binary.BigEndian.Uint32(b[2:6]) + m.Index = binary.BigEndian.Uint64(b[6:14]) + m.Data = make([]byte, binary.BigEndian.Uint32(b[14:18])) +} + // MessageDecoder decodes messages from a reader. type MessageDecoder struct { r io.Reader @@ -803,17 +823,15 @@ func NewMessageDecoder(r io.Reader) *MessageDecoder { // Decode reads a message from the decoder's reader. func (dec *MessageDecoder) Decode(m *Message) error { + // Read header bytes. var b [messageHeaderSize]byte if _, err := io.ReadFull(dec.r, b[:]); err != nil { return err } - m.Type = MessageType(binary.BigEndian.Uint16(b[0:2])) - m.Index = binary.BigEndian.Uint64(b[2:10]) - m.Data = make([]byte, binary.BigEndian.Uint32(b[10:14])) + m.unmarshalHeader(b[:]) // Read data. - if n, err := io.ReadFull(dec.r, m.Data); err != nil { - warn("io.2", n, len(m.Data), err) + if _, err := io.ReadFull(dec.r, m.Data); err != nil { return err } diff --git a/broker/broker_test.go b/broker/broker_test.go index e9422ac041..fbf369f5d9 100644 --- a/broker/broker_test.go +++ b/broker/broker_test.go @@ -94,7 +94,7 @@ func TestBroker_Publish(t *testing.T) { // Read out the published message. if err := dec.Decode(&m); err != nil { t.Fatalf("decode: %s", err) - } else if !reflect.DeepEqual(&m, &broker.Message{Type: 100, Index: 5, Data: []byte("0000")}) { + } else if !reflect.DeepEqual(&m, &broker.Message{Type: 100, TopicID: 1, Index: 5, Data: []byte("0000")}) { t.Fatalf("unexpected message: %#v", &m) } @@ -299,9 +299,3 @@ func tempfile() string { func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) } func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) } - -func ok(err error) { - if err != nil { - panic("unexpected error") - } -} diff --git a/broker/client.go b/broker/client.go index afb8ffaf35..2e79f2f165 100644 --- a/broker/client.go +++ b/broker/client.go @@ -9,8 +9,9 @@ import ( "time" ) -// ReconnectTimeout is the time to wait between stream disconnects before retrying. -const ReconnectTimeout = 100 * time.Millisecond +// DefaultReconnectTimeout is the default time to wait between when a broker +// stream disconnects and another connection is retried. +const DefaultReconnectTimeout = 100 * time.Millisecond // Client represents a client for the broker's HTTP API. // Once opened, the client will stream down all messages that @@ -20,18 +21,22 @@ type Client struct { urls []*url.URL // list of URLs for all known brokers. opened bool - done chan struct{} // disconnection notification + done chan chan struct{} // disconnection notification // Channel streams messages from the broker. // Messages can be duplicated so it is important to check the index // of the incoming message index to make sure it has not been processed. C chan *Message + + // The amount of time to wait before reconnecting to a broker stream. + ReconnectTimeout time.Duration } // NewClient returns a new instance of Client. func NewClient(name string) *Client { return &Client{ - name: name, + name: name, + ReconnectTimeout: DefaultReconnectTimeout, } } @@ -65,7 +70,7 @@ func (c *Client) Open(urls []*url.URL) error { c.C = make(chan *Message, 0) // Open the streamer. - c.done = make(chan struct{}) + c.done = make(chan chan struct{}) go c.streamer(c.done) // Set open flag. @@ -84,14 +89,16 @@ func (c *Client) Close() error { return ErrClientClosed } + // Shutdown streamer. + ch := make(chan struct{}) + c.done <- ch + <-ch + c.done = nil + // Close message stream. close(c.C) c.C = nil - // Shutdown streamer. - close(c.done) - c.done = nil - // Unset open flag. c.opened = false @@ -99,11 +106,12 @@ func (c *Client) Close() error { } // streamer connects to a broker server and streams the replica's messages. -func (c *Client) streamer(done chan struct{}) { +func (c *Client) streamer(done chan chan struct{}) { for { // Check for the client disconnection. select { - case <-done: + case ch := <-done: + close(ch) return default: } @@ -123,38 +131,57 @@ func (c *Client) streamer(done chan struct{}) { } // streamFromURL connects to a broker server and streams the replica's messages. -func (c *Client) streamFromURL(u *url.URL, done chan struct{}) error { +func (c *Client) streamFromURL(u *url.URL, done chan chan struct{}) error { + // Set the replica name on the URL and open the stream. u.RawQuery = url.Values{"name": {c.name}}.Encode() resp, err := http.Get(u.String()) if err != nil { - time.Sleep(ReconnectTimeout) + time.Sleep(c.ReconnectTimeout) return nil } defer func() { _ = resp.Body.Close() }() // Ensure that we received a 200 OK from the server before streaming. if resp.StatusCode != http.StatusOK { - warn("status:", resp.StatusCode) + time.Sleep(c.ReconnectTimeout) + return nil } - // Continuously decode messages from request body. - dec := NewMessageDecoder(resp.Body) - for { - // Decode message from the stream. - m := &Message{} - if err := dec.Decode(m); err != nil { - return err + // Continuously decode messages from request body in a separate goroutine. + errNotify := make(chan error, 0) + go func() { + dec := NewMessageDecoder(resp.Body) + for { + // Decode message from the stream. + m := &Message{} + if err := dec.Decode(m); err != nil { + errNotify <- err + return + } + + // Write message to streaming channel. + c.C <- m } + }() - // Send message to channel. - c.C <- m + // Check for the client disconnect or error from the stream. + select { + case ch := <-done: + // Close body. + resp.Body.Close() - // Check for notification of disconnect. + // Clear message buffer. select { - case <-done: - return errDone + case <-c.C: default: } + + // Notify the close function and return marker error. + close(ch) + return errDone + + case err := <-errNotify: + return err } } diff --git a/broker/client_test.go b/broker/client_test.go index bbcd3e1bfa..abffb6d876 100644 --- a/broker/client_test.go +++ b/broker/client_test.go @@ -3,18 +3,27 @@ package broker_test import ( "net/url" "testing" + "time" "github.com/influxdb/influxdb/broker" ) +// Ensure the client name can be retrieved. +func TestClient_Name(t *testing.T) { + c := NewClient("node0") + defer c.Close() + if name := c.Name(); name != "node0" { + t.Fatalf("unexpected name: %s", name) + } +} + // Ensure that a client can open a connect to the broker. func TestClient_Open(t *testing.T) { c := NewClient("node0") defer c.Close() // Create replica on broker. - b := c.Handler.Broker - ok(b.CreateReplica("node0")) + c.Handler.Broker.CreateReplica("node0") // Open client to broker. u, _ := url.Parse(c.Handler.HTTPServer.URL) @@ -22,7 +31,7 @@ func TestClient_Open(t *testing.T) { t.Fatalf("unexpected error: %s", err) } - // Receive a set of messages from the stream. + // Receive a message from the stream. if m := <-c.C; m.Type != broker.CreateReplicaMessageType { t.Fatalf("unexpected message type: %x", m.Type) } @@ -33,6 +42,49 @@ func TestClient_Open(t *testing.T) { } } +// Ensure that opening an already open client returns an error. +func TestClient_Open_ErrClientOpen(t *testing.T) { + c := NewClient("node0") + defer c.Close() + + // Open client to broker. + u, _ := url.Parse(c.Handler.HTTPServer.URL) + c.Open([]*url.URL{u}) + if err := c.Open([]*url.URL{u}); err != broker.ErrClientOpen { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure that opening a client without a broker URL returns an error. +func TestClient_Open_ErrBrokerURLRequired(t *testing.T) { + c := NewClient("node0") + defer c.Close() + if err := c.Open([]*url.URL{}); err != broker.ErrBrokerURLRequired { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure that a client can close while a message is pending. +func TestClient_Close(t *testing.T) { + c := NewClient("node0") + defer c.Close() + + // Create replica on broker. + c.Handler.Broker.CreateReplica("node0") + + // Open client to broker. + u, _ := url.Parse(c.Handler.HTTPServer.URL) + if err := c.Open([]*url.URL{u}); err != nil { + t.Fatalf("unexpected error: %s", err) + } + time.Sleep(10 * time.Millisecond) + + // Close connection to the broker. + if err := c.Client.Close(); err != nil { + t.Fatalf("unexpected error: %s", err) + } +} + // Client represents a test wrapper for the broker client. type Client struct { *broker.Client