2014-10-21 02:42:03 +00:00
|
|
|
package messaging
|
2014-10-17 04:11:28 +00:00
|
|
|
|
|
|
|
import (
|
2014-10-24 00:54:12 +00:00
|
|
|
"bytes"
|
2014-12-16 21:24:21 +00:00
|
|
|
"encoding/json"
|
2014-10-17 04:11:28 +00:00
|
|
|
"errors"
|
2014-10-24 00:54:12 +00:00
|
|
|
"fmt"
|
2015-01-29 23:07:58 +00:00
|
|
|
"io"
|
2014-12-16 21:24:21 +00:00
|
|
|
"io/ioutil"
|
2014-11-13 05:32:42 +00:00
|
|
|
"log"
|
2014-10-17 04:11:28 +00:00
|
|
|
"math/rand"
|
|
|
|
"net/http"
|
|
|
|
"net/url"
|
2014-11-13 05:32:42 +00:00
|
|
|
"os"
|
2014-10-24 00:54:12 +00:00
|
|
|
"strconv"
|
2014-10-17 04:11:28 +00:00
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
2014-10-17 15:53:10 +00:00
|
|
|
// DefaultReconnectTimeout is the default time to wait between when a broker
|
|
|
|
// stream disconnects and another connection is retried.
|
2015-03-11 18:00:45 +00:00
|
|
|
const DefaultReconnectTimeout = 1000 * time.Millisecond
|
|
|
|
|
|
|
|
// DefaultPingInterval is the default time to wait between checks to the broker.
|
|
|
|
const DefaultPingInterval = 1000 * time.Millisecond
|
2014-10-17 04:11:28 +00:00
|
|
|
|
|
|
|
// Client represents a client for the broker's HTTP API.
|
|
|
|
type Client struct {
|
2015-03-08 06:21:44 +00:00
|
|
|
mu sync.Mutex
|
|
|
|
conns []*Conn
|
|
|
|
url url.URL // current known leader URL
|
|
|
|
urls []url.URL // list of available broker URLs
|
2014-10-17 04:11:28 +00:00
|
|
|
|
|
|
|
opened bool
|
2015-03-11 18:00:45 +00:00
|
|
|
|
|
|
|
wg sync.WaitGroup
|
|
|
|
closing chan struct{}
|
2014-10-17 04:11:28 +00:00
|
|
|
|
2014-10-17 15:53:10 +00:00
|
|
|
// The amount of time to wait before reconnecting to a broker stream.
|
|
|
|
ReconnectTimeout time.Duration
|
2014-11-13 05:32:42 +00:00
|
|
|
|
2015-03-11 18:00:45 +00:00
|
|
|
// The amount of time between pings to verify the broker is alive.
|
|
|
|
PingInterval time.Duration
|
|
|
|
|
2014-11-13 05:32:42 +00:00
|
|
|
// The logging interface used by the client for out-of-band errors.
|
|
|
|
Logger *log.Logger
|
2014-10-17 04:11:28 +00:00
|
|
|
}
|
|
|
|
|
2015-03-08 21:28:43 +00:00
|
|
|
// NewClient returns a new instance of Client with defaults set.
|
|
|
|
func NewClient() *Client {
|
|
|
|
c := &Client{
|
2014-10-17 15:53:10 +00:00
|
|
|
ReconnectTimeout: DefaultReconnectTimeout,
|
2015-03-11 18:00:45 +00:00
|
|
|
PingInterval: DefaultPingInterval,
|
2014-10-17 04:11:28 +00:00
|
|
|
}
|
2015-03-08 21:28:43 +00:00
|
|
|
c.SetLogOutput(os.Stderr)
|
|
|
|
return c
|
2015-03-01 14:06:25 +00:00
|
|
|
}
|
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
// URL returns the current broker leader's URL.
|
|
|
|
func (c *Client) URL() url.URL {
|
2014-10-17 04:11:28 +00:00
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
2015-03-08 06:21:44 +00:00
|
|
|
return c.url
|
2014-10-17 04:11:28 +00:00
|
|
|
}
|
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
// SetURL sets the current URL to connect to for the client and its connections.
|
|
|
|
func (c *Client) SetURL(u url.URL) {
|
2014-10-24 00:54:12 +00:00
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
2015-03-08 06:21:44 +00:00
|
|
|
c.setURL(u)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Client) setURL(u url.URL) {
|
|
|
|
// Set the client URL.
|
|
|
|
c.url = u
|
2014-10-24 00:54:12 +00:00
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
// Update all connections.
|
|
|
|
for _, conn := range c.conns {
|
|
|
|
conn.SetURL(u)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// RandomizeURL sets a random URL from the configuration.
|
|
|
|
func (c *Client) RandomizeURL() {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
c.randomizeURL()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Client) randomizeURL() {
|
|
|
|
// Clear URL if no brokers exist.
|
|
|
|
if len(c.urls) == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Otherwise randomly select a URL.
|
|
|
|
c.setURL(c.urls[rand.Intn(len(c.urls))])
|
2014-10-24 00:54:12 +00:00
|
|
|
}
|
|
|
|
|
2015-01-29 23:07:58 +00:00
|
|
|
// SetLogOutput sets writer for all Client log output.
|
|
|
|
func (c *Client) SetLogOutput(w io.Writer) {
|
|
|
|
c.Logger = log.New(w, "[messaging] ", log.LstdFlags)
|
|
|
|
}
|
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
// Open reads the configuration from the specified path or uses the URLs provided.
|
|
|
|
func (c *Client) Open(path string, urls []url.URL) error {
|
2014-10-17 04:11:28 +00:00
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
|
|
|
// Return error if the client is already open.
|
|
|
|
if c.opened {
|
|
|
|
return ErrClientOpen
|
2014-12-16 21:24:21 +00:00
|
|
|
}
|
|
|
|
|
2015-01-07 00:21:32 +00:00
|
|
|
// Read URLs from file if no URLs are provided.
|
|
|
|
if len(urls) == 0 {
|
2015-03-08 06:21:44 +00:00
|
|
|
if b, err := ioutil.ReadFile(path); os.IsNotExist(err) {
|
2015-01-07 00:21:32 +00:00
|
|
|
// nop
|
|
|
|
} else if err != nil {
|
2014-12-16 21:24:21 +00:00
|
|
|
return err
|
2015-01-07 00:21:32 +00:00
|
|
|
} else {
|
2015-03-08 06:21:44 +00:00
|
|
|
var config ClientConfig
|
|
|
|
if err := json.Unmarshal(b, &config); err != nil {
|
2015-01-07 00:21:32 +00:00
|
|
|
return err
|
|
|
|
}
|
2015-03-08 06:21:44 +00:00
|
|
|
c.urls = config.Brokers
|
2014-12-16 21:24:21 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
// Ensure we have at least one URL.
|
2015-01-07 00:21:32 +00:00
|
|
|
if len(urls) < 1 {
|
2014-10-17 04:11:28 +00:00
|
|
|
return ErrBrokerURLRequired
|
|
|
|
}
|
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
// Set the URLs whether they're from the config or passed in.
|
|
|
|
c.urls = urls
|
|
|
|
c.randomizeURL()
|
2014-10-17 04:11:28 +00:00
|
|
|
|
|
|
|
// Set open flag.
|
|
|
|
c.opened = true
|
|
|
|
|
2015-03-11 18:00:45 +00:00
|
|
|
// Start background ping.
|
|
|
|
c.closing = make(chan struct{}, 0)
|
|
|
|
c.wg.Add(1)
|
|
|
|
go c.pinger(c.closing)
|
|
|
|
|
2014-10-17 04:11:28 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close disconnects the client from the broker cluster.
|
|
|
|
func (c *Client) Close() error {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
|
|
|
// Return error if the client is already closed.
|
|
|
|
if !c.opened {
|
|
|
|
return ErrClientClosed
|
|
|
|
}
|
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
// Close all connections.
|
|
|
|
for _, conn := range c.conns {
|
|
|
|
_ = conn.Close()
|
2015-01-07 00:21:32 +00:00
|
|
|
}
|
2015-03-08 06:21:44 +00:00
|
|
|
c.conns = nil
|
2014-10-17 04:11:28 +00:00
|
|
|
|
2015-03-11 18:00:45 +00:00
|
|
|
// Close goroutines.
|
|
|
|
if c.closing != nil {
|
|
|
|
close(c.closing)
|
|
|
|
c.closing = nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Wait for goroutines to finish.
|
|
|
|
c.mu.Unlock()
|
|
|
|
c.wg.Wait()
|
|
|
|
c.mu.Lock()
|
|
|
|
|
2014-10-17 04:11:28 +00:00
|
|
|
// Unset open flag.
|
|
|
|
c.opened = false
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2014-10-24 00:54:12 +00:00
|
|
|
// Publish sends a message to the broker and returns an index or error.
|
2014-11-11 05:25:03 +00:00
|
|
|
func (c *Client) Publish(m *Message) (uint64, error) {
|
2015-03-08 06:21:44 +00:00
|
|
|
// Post message to broker.
|
|
|
|
values := url.Values{
|
|
|
|
"type": {strconv.FormatUint(uint64(m.Type), 10)},
|
|
|
|
"topicID": {strconv.FormatUint(m.TopicID, 10)},
|
|
|
|
}
|
|
|
|
resp, err := c.do("POST", "/messaging/messages", values, "application/octet-stream", bytes.NewReader(m.Data))
|
|
|
|
if err != nil {
|
|
|
|
return 0, fmt.Errorf("do: %s", err)
|
|
|
|
}
|
|
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
|
2015-03-11 18:00:45 +00:00
|
|
|
// Check response code.
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
|
|
if errstr := resp.Header.Get("X-Broker-Error"); errstr != "" {
|
|
|
|
return 0, errors.New(errstr)
|
|
|
|
}
|
|
|
|
return 0, fmt.Errorf("cannot publish(%d)", resp.StatusCode)
|
|
|
|
}
|
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
// Parse broker index.
|
|
|
|
index, err := strconv.ParseUint(resp.Header.Get("X-Broker-Index"), 10, 64)
|
|
|
|
if err != nil {
|
|
|
|
return 0, fmt.Errorf("invalid index: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return index, nil
|
|
|
|
}
|
2014-10-24 00:54:12 +00:00
|
|
|
|
2015-03-11 18:00:45 +00:00
|
|
|
// Ping sends a request to the current broker to check if it is alive.
|
|
|
|
// If the broker is down then a new URL is tried.
|
|
|
|
func (c *Client) Ping() error {
|
|
|
|
// Post message to broker.
|
|
|
|
resp, err := c.do("POST", "/messaging/ping", nil, "application/octet-stream", nil)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
resp.Body.Close()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
// do sends an HTTP request to the given path with the current leader URL.
|
|
|
|
// This will automatically retry the request if it is redirected.
|
|
|
|
func (c *Client) do(method, path string, values url.Values, contentType string, body io.Reader) (*http.Response, error) {
|
2015-01-28 06:09:50 +00:00
|
|
|
for {
|
2015-03-08 06:21:44 +00:00
|
|
|
// Generate URL.
|
|
|
|
u := c.URL()
|
|
|
|
u.Path = path
|
|
|
|
u.RawQuery = values.Encode()
|
|
|
|
|
|
|
|
// Create request.
|
|
|
|
req, err := http.NewRequest(method, u.String(), body)
|
2015-01-28 06:09:50 +00:00
|
|
|
if err != nil {
|
2015-03-08 06:21:44 +00:00
|
|
|
return nil, fmt.Errorf("new request: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Send HTTP request.
|
|
|
|
// If it cannot connect then select a different URL from the config.
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
|
|
if err != nil {
|
|
|
|
c.randomizeURL()
|
|
|
|
return nil, err
|
2015-01-28 06:09:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// If a temporary redirect occurs then update the leader and retry.
|
|
|
|
// If a non-200 status is returned then an error occurred.
|
|
|
|
if resp.StatusCode == http.StatusTemporaryRedirect {
|
|
|
|
redirectURL, err := url.Parse(resp.Header.Get("Location"))
|
|
|
|
if err != nil {
|
2015-03-08 06:21:44 +00:00
|
|
|
resp.Body.Close()
|
|
|
|
return nil, fmt.Errorf("invalid redirect location: %s", resp.Header.Get("Location"))
|
2015-01-28 06:09:50 +00:00
|
|
|
}
|
2015-03-08 06:21:44 +00:00
|
|
|
c.SetURL(*redirectURL)
|
2015-01-28 06:09:50 +00:00
|
|
|
continue
|
|
|
|
}
|
2015-03-08 06:21:44 +00:00
|
|
|
|
|
|
|
return resp, nil
|
2014-10-24 00:54:12 +00:00
|
|
|
}
|
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
}
|
|
|
|
|
2015-03-09 21:47:41 +00:00
|
|
|
// Conn returns a connection to the broker for a given topic.
|
|
|
|
func (c *Client) Conn(topicID uint64) *Conn {
|
2015-03-08 06:21:44 +00:00
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
|
|
|
// Create connection and set current URL.
|
2015-03-09 21:47:41 +00:00
|
|
|
conn := NewConn(topicID)
|
2015-03-08 06:21:44 +00:00
|
|
|
conn.SetURL(c.url)
|
|
|
|
|
|
|
|
// Add to list of client connections.
|
|
|
|
c.conns = append(c.conns, conn)
|
|
|
|
|
2015-03-09 21:47:41 +00:00
|
|
|
return conn
|
2015-03-08 06:21:44 +00:00
|
|
|
}
|
|
|
|
|
2015-03-11 18:00:45 +00:00
|
|
|
// pinger periodically pings the broker to check that it is alive.
|
|
|
|
func (c *Client) pinger(closing chan struct{}) {
|
|
|
|
defer c.wg.Done()
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-closing:
|
|
|
|
return
|
|
|
|
case <-time.After(c.PingInterval):
|
|
|
|
c.Ping()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
// ClientConfig represents the configuration that must be persisted across restarts.
|
|
|
|
type ClientConfig struct {
|
|
|
|
Brokers []url.URL `json:"brokers"`
|
2015-03-10 22:27:37 +00:00
|
|
|
Leader url.URL `json:"leader"`
|
2015-03-08 06:21:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewClientConfig returns a new instance of ClientConfig.
|
|
|
|
func NewClientConfig(u []url.URL) *ClientConfig {
|
|
|
|
return &ClientConfig{
|
|
|
|
Brokers: u,
|
|
|
|
}
|
2014-10-24 00:54:12 +00:00
|
|
|
}
|
|
|
|
|
2015-03-08 21:28:43 +00:00
|
|
|
// Conn represents a stream over the client for a single topic.
|
|
|
|
type Conn struct {
|
2015-03-10 20:53:45 +00:00
|
|
|
mu sync.Mutex
|
|
|
|
topicID uint64 // topic identifier
|
|
|
|
index uint64 // highest index sent over the channel
|
|
|
|
streaming bool // use streaming reader, if true
|
|
|
|
url url.URL // current broker url
|
2015-03-08 21:28:43 +00:00
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
opened bool
|
|
|
|
c chan *Message // channel streams messages from the broker.
|
|
|
|
|
|
|
|
wg sync.WaitGroup
|
|
|
|
closing chan struct{}
|
|
|
|
|
|
|
|
// The amount of time to wait before reconnecting to a broker stream.
|
|
|
|
ReconnectTimeout time.Duration
|
|
|
|
|
|
|
|
// The logging interface used by the connection for out-of-band errors.
|
|
|
|
Logger *log.Logger
|
2015-03-08 21:28:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewConn returns a new connection to the broker for a topic.
|
2015-03-09 21:47:41 +00:00
|
|
|
func NewConn(topicID uint64) *Conn {
|
2015-03-08 21:28:43 +00:00
|
|
|
return &Conn{
|
2015-03-09 21:47:41 +00:00
|
|
|
topicID: topicID,
|
2015-03-08 06:21:44 +00:00
|
|
|
ReconnectTimeout: DefaultReconnectTimeout,
|
|
|
|
Logger: log.New(os.Stderr, "", log.LstdFlags),
|
2015-03-08 21:28:43 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// TopicID returns the connection's topic id.
|
|
|
|
func (c *Conn) TopicID() uint64 { return c.topicID }
|
|
|
|
|
|
|
|
// C returns streaming channel for the connection.
|
|
|
|
func (c *Conn) C() <-chan *Message { return c.c }
|
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
// Index returns the highest index replicated to the caller.
|
2015-03-08 21:28:43 +00:00
|
|
|
func (c *Conn) Index() uint64 {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
return c.index
|
|
|
|
}
|
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
// SetIndex sets the highest index replicated to the caller.
|
|
|
|
func (c *Conn) SetIndex(index uint64) {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
c.index = index
|
|
|
|
}
|
|
|
|
|
2015-03-10 20:53:45 +00:00
|
|
|
// Streaming returns true if the connection streams messages continuously.
|
|
|
|
func (c *Conn) Streaming() bool {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
return c.streaming
|
|
|
|
}
|
|
|
|
|
2015-03-08 21:28:43 +00:00
|
|
|
// URL returns the current URL of the connection.
|
|
|
|
func (c *Conn) URL() url.URL {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
return c.url
|
|
|
|
}
|
|
|
|
|
|
|
|
// SetURL sets the current URL of the connection.
|
|
|
|
func (c *Conn) SetURL(u url.URL) {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
c.url = u
|
|
|
|
}
|
|
|
|
|
|
|
|
// Open opens a streaming connection to the broker.
|
2015-03-10 20:53:45 +00:00
|
|
|
func (c *Conn) Open(index uint64, streaming bool) error {
|
2015-03-08 06:21:44 +00:00
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
|
|
|
// Exit if aleady open or previously closed.
|
|
|
|
if c.opened {
|
|
|
|
return ErrConnOpen
|
|
|
|
} else if c.c != nil {
|
|
|
|
return ErrConnCannotReuse
|
|
|
|
}
|
|
|
|
c.opened = true
|
|
|
|
|
2015-03-09 21:47:41 +00:00
|
|
|
// Set starting index.
|
|
|
|
c.index = index
|
2015-03-10 20:53:45 +00:00
|
|
|
c.streaming = streaming
|
2015-03-09 21:47:41 +00:00
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
// Create streaming channel.
|
|
|
|
c.c = make(chan *Message, 0)
|
|
|
|
|
|
|
|
// Start goroutines.
|
|
|
|
c.wg.Add(1)
|
|
|
|
c.closing = make(chan struct{})
|
|
|
|
go c.streamer(c.closing)
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close closes a connection.
|
|
|
|
func (c *Conn) Close() error {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
return c.close()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Conn) close() error {
|
|
|
|
if !c.opened {
|
|
|
|
return ErrConnClosed
|
|
|
|
}
|
|
|
|
|
|
|
|
// Notify goroutines that the connection is closing.
|
|
|
|
if c.closing != nil {
|
|
|
|
close(c.closing)
|
|
|
|
c.closing = nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Wait for goroutines to finish.
|
|
|
|
c.mu.Unlock()
|
|
|
|
c.wg.Wait()
|
|
|
|
c.mu.Lock()
|
|
|
|
|
|
|
|
// Close channel.
|
|
|
|
close(c.c)
|
|
|
|
|
|
|
|
// Mark connection as closed.
|
|
|
|
c.opened = false
|
|
|
|
|
|
|
|
return nil
|
2015-03-08 21:28:43 +00:00
|
|
|
}
|
|
|
|
|
2015-03-01 14:06:25 +00:00
|
|
|
// Heartbeat sends a heartbeat back to the broker with the client's index.
|
2015-03-08 21:28:43 +00:00
|
|
|
func (c *Conn) Heartbeat() error {
|
2015-03-01 14:06:25 +00:00
|
|
|
var resp *http.Response
|
|
|
|
var err error
|
|
|
|
|
|
|
|
// Retrieve the parameters under lock.
|
|
|
|
c.mu.Lock()
|
2015-03-08 21:28:43 +00:00
|
|
|
topicID, index, u := c.topicID, c.index, c.url
|
2015-03-01 14:06:25 +00:00
|
|
|
c.mu.Unlock()
|
|
|
|
|
|
|
|
// Send the message to the messages endpoint.
|
|
|
|
u.Path = "/messaging/heartbeat"
|
|
|
|
u.RawQuery = url.Values{
|
2015-03-08 21:28:43 +00:00
|
|
|
"topicID": {strconv.FormatUint(topicID, 10)},
|
|
|
|
"index": {strconv.FormatUint(index, 10)},
|
2015-03-01 14:06:25 +00:00
|
|
|
}.Encode()
|
|
|
|
resp, err = http.Post(u.String(), "application/octet-stream", nil)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
|
|
|
|
|
|
// If the server returns a redirect then it's not the leader.
|
|
|
|
// If it returns a non-200 code then return the error.
|
|
|
|
if resp.StatusCode == http.StatusTemporaryRedirect {
|
|
|
|
return ErrNoLeader
|
|
|
|
} else if resp.StatusCode != http.StatusOK {
|
|
|
|
if errstr := resp.Header.Get("X-Broker-Error"); errstr != "" {
|
|
|
|
return errors.New(errstr)
|
|
|
|
}
|
|
|
|
return fmt.Errorf("heartbeat error: %d", resp.StatusCode)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2014-10-17 04:11:28 +00:00
|
|
|
// streamer connects to a broker server and streams the replica's messages.
|
2015-03-08 06:21:44 +00:00
|
|
|
func (c *Conn) streamer(closing <-chan struct{}) {
|
|
|
|
defer c.wg.Done()
|
2014-10-17 04:11:28 +00:00
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
// Continually connect and retry streaming from server.
|
|
|
|
var req *http.Request
|
|
|
|
var reqlock sync.Mutex
|
2014-10-17 04:11:28 +00:00
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
c.wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer c.wg.Done()
|
|
|
|
for {
|
|
|
|
// Check that the connection is not closing.
|
|
|
|
select {
|
|
|
|
case <-closing:
|
|
|
|
return
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create URL.
|
|
|
|
u := c.URL()
|
|
|
|
u.Path = "/messaging/messages"
|
|
|
|
u.RawQuery = url.Values{
|
2015-03-10 20:53:45 +00:00
|
|
|
"topicID": {strconv.FormatUint(c.topicID, 10)},
|
|
|
|
"index": {strconv.FormatUint(c.Index(), 10)},
|
|
|
|
"streaming": {strconv.FormatBool(c.Streaming())},
|
2015-03-08 06:21:44 +00:00
|
|
|
}.Encode()
|
|
|
|
|
|
|
|
// Create request.
|
|
|
|
reqlock.Lock()
|
|
|
|
req, _ = http.NewRequest("GET", u.String(), nil)
|
|
|
|
reqlock.Unlock()
|
|
|
|
|
|
|
|
// Begin streaming request.
|
|
|
|
if err := c.stream(req, closing); err != nil {
|
|
|
|
c.Logger.Printf("reconnecting to broker: url=%s, err=%s", u, err)
|
|
|
|
time.Sleep(c.ReconnectTimeout)
|
|
|
|
}
|
2014-10-17 04:11:28 +00:00
|
|
|
}
|
2015-03-08 06:21:44 +00:00
|
|
|
}()
|
|
|
|
|
|
|
|
// Wait for the connection to close or the request to close.
|
|
|
|
<-closing
|
|
|
|
|
|
|
|
// Close in-flight request.
|
|
|
|
reqlock.Lock()
|
|
|
|
if req != nil {
|
|
|
|
http.DefaultTransport.(*http.Transport).CancelRequest(req)
|
2014-10-17 04:11:28 +00:00
|
|
|
}
|
2015-03-08 06:21:44 +00:00
|
|
|
reqlock.Unlock()
|
2014-10-17 04:11:28 +00:00
|
|
|
}
|
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
// stream connects to a broker server and streams the topic messages.
|
|
|
|
func (c *Conn) stream(req *http.Request, closing <-chan struct{}) error {
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
2014-10-17 04:11:28 +00:00
|
|
|
if err != nil {
|
2015-03-08 06:21:44 +00:00
|
|
|
return err
|
2014-10-17 04:11:28 +00:00
|
|
|
}
|
|
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
|
|
|
|
// Ensure that we received a 200 OK from the server before streaming.
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
2015-03-08 06:21:44 +00:00
|
|
|
return fmt.Errorf("invalid stream status code: %d", resp.StatusCode)
|
2014-10-17 04:11:28 +00:00
|
|
|
}
|
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
c.Logger.Printf("connected to broker: %s", req.URL.String())
|
2015-01-28 06:09:50 +00:00
|
|
|
|
2014-10-17 15:53:10 +00:00
|
|
|
// Continuously decode messages from request body in a separate goroutine.
|
2015-03-08 06:21:44 +00:00
|
|
|
dec := NewMessageDecoder(resp.Body)
|
|
|
|
for {
|
|
|
|
// Decode message from the stream.
|
|
|
|
m := &Message{}
|
|
|
|
if err := dec.Decode(m); err == io.EOF {
|
|
|
|
return nil
|
|
|
|
} else if err != nil {
|
|
|
|
return fmt.Errorf("decode: %s", err)
|
2014-10-17 04:11:28 +00:00
|
|
|
}
|
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
// TODO: Write broker set updates, do not passthrough to channel.
|
2014-10-17 04:11:28 +00:00
|
|
|
|
2015-03-08 06:21:44 +00:00
|
|
|
// Write message to streaming channel.
|
2014-10-17 04:11:28 +00:00
|
|
|
select {
|
2015-03-08 06:21:44 +00:00
|
|
|
case <-closing:
|
|
|
|
return nil
|
|
|
|
case c.c <- m:
|
2014-10-17 04:11:28 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|