Add streaming HTTP broker handler.

pull/903/head
Ben Johnson 2014-10-16 22:11:28 -06:00
parent 2e01f603e9
commit 14fd40cdb5
10 changed files with 377 additions and 35 deletions

View File

@ -3,10 +3,6 @@ Broker
## Uncompleted ## Uncompleted
- [ ] Test coverage
- [x] Broker.DeleteReplica
- [ ] Broker.Subscribe
- [ ] Broker.Unsubscribe
- [ ] Cluster configuration integration - [ ] Cluster configuration integration
- [ ] Broker FSM snapshotting - [ ] Broker FSM snapshotting
- [ ] HTTP Handler - [ ] HTTP Handler
@ -26,3 +22,4 @@ Broker
- [x] Broker publishing - [x] Broker publishing
- [x] Config topic - [x] Config topic
- [x] Stream topic from index - [x] Stream topic from index
- [x] Test coverage

View File

@ -86,6 +86,11 @@ func (b *Broker) Close() error {
// TODO: Close all topics. // TODO: Close all topics.
// Close all replicas.
for _, r := range b.replicas {
r.closeWriter()
}
// Close raft log. // Close raft log.
_ = b.log.Close() _ = b.log.Close()
@ -653,6 +658,11 @@ func (r *Replica) Write(p []byte) (int, error) {
return n, errReplicaUnavailable return n, errReplicaUnavailable
} }
// If the writer has a flush method then call it.
if w, ok := r.writer.(flusher); ok {
w.Flush()
}
return n, nil return n, nil
} }
@ -802,13 +812,18 @@ func (dec *MessageDecoder) Decode(m *Message) error {
m.Data = make([]byte, binary.BigEndian.Uint32(b[10:14])) m.Data = make([]byte, binary.BigEndian.Uint32(b[10:14]))
// Read data. // Read data.
if _, err := io.ReadFull(dec.r, m.Data); err != nil { if n, err := io.ReadFull(dec.r, m.Data); err != nil {
warn("io.2", n, len(m.Data), err)
return err return err
} }
return nil return nil
} }
type flusher interface {
Flush()
}
// jsonify marshals a value to a JSON-encoded byte slice. // jsonify marshals a value to a JSON-encoded byte slice.
// This should only be used with internal data that will not return marshal errors. // This should only be used with internal data that will not return marshal errors.
func jsonify(v interface{}) []byte { func jsonify(v interface{}) []byte {

View File

@ -299,3 +299,9 @@ func tempfile() string {
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) } func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) } func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }
func ok(err error) {
if err != nil {
panic("unexpected error")
}
}

162
broker/client.go Normal file
View File

@ -0,0 +1,162 @@
package broker
import (
"errors"
"math/rand"
"net/http"
"net/url"
"sync"
"time"
)
// ReconnectTimeout is the time to wait between stream disconnects before retrying.
const ReconnectTimeout = 100 * time.Millisecond
// Client represents a client for the broker's HTTP API.
// Once opened, the client will stream down all messages that
type Client struct {
mu sync.Mutex
name string // the name of the client connecting.
urls []*url.URL // list of URLs for all known brokers.
opened bool
done chan struct{} // disconnection notification
// Channel streams messages from the broker.
// Messages can be duplicated so it is important to check the index
// of the incoming message index to make sure it has not been processed.
C chan *Message
}
// NewClient returns a new instance of Client.
func NewClient(name string) *Client {
return &Client{
name: name,
}
}
// Name returns the replica name that the client was opened with.
func (c *Client) Name() string { return c.name }
// URLs returns a list of broker URLs to connect to.
func (c *Client) URLs() []*url.URL {
c.mu.Lock()
defer c.mu.Unlock()
return c.urls
}
// Open initializes and opens the connection to the broker cluster.
func (c *Client) Open(urls []*url.URL) error {
c.mu.Lock()
defer c.mu.Unlock()
// Return error if the client is already open.
// Require at least one broker URL.
if c.opened {
return ErrClientOpen
} else if len(urls) == 0 {
return ErrBrokerURLRequired
}
// Set the URLs to connect to on the client.
c.urls = urls
// Create a channel for streaming messages.
c.C = make(chan *Message, 0)
// Open the streamer.
c.done = make(chan struct{})
go c.streamer(c.done)
// Set open flag.
c.opened = true
return nil
}
// Close disconnects the client from the broker cluster.
func (c *Client) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
// Return error if the client is already closed.
if !c.opened {
return ErrClientClosed
}
// Close message stream.
close(c.C)
c.C = nil
// Shutdown streamer.
close(c.done)
c.done = nil
// Unset open flag.
c.opened = false
return nil
}
// streamer connects to a broker server and streams the replica's messages.
func (c *Client) streamer(done chan struct{}) {
for {
// Check for the client disconnection.
select {
case <-done:
return
default:
}
// TODO: Validate that there is at least one broker URL.
// Choose a random broker URL.
urls := c.URLs()
u := *urls[rand.Intn(len(urls))]
// Connect to broker and stream.
u.Path = "/stream"
if err := c.streamFromURL(&u, done); err == errDone {
return
}
}
}
// streamFromURL connects to a broker server and streams the replica's messages.
func (c *Client) streamFromURL(u *url.URL, done chan struct{}) error {
u.RawQuery = url.Values{"name": {c.name}}.Encode()
resp, err := http.Get(u.String())
if err != nil {
time.Sleep(ReconnectTimeout)
return nil
}
defer func() { _ = resp.Body.Close() }()
// Ensure that we received a 200 OK from the server before streaming.
if resp.StatusCode != http.StatusOK {
warn("status:", resp.StatusCode)
}
// Continuously decode messages from request body.
dec := NewMessageDecoder(resp.Body)
for {
// Decode message from the stream.
m := &Message{}
if err := dec.Decode(m); err != nil {
return err
}
// Send message to channel.
c.C <- m
// Check for notification of disconnect.
select {
case <-done:
return errDone
default:
}
}
}
// marker error for the streamer.
var errDone = errors.New("done")

55
broker/client_test.go Normal file
View File

@ -0,0 +1,55 @@
package broker_test
import (
"net/url"
"testing"
"github.com/influxdb/influxdb/broker"
)
// Ensure that a client can open a connect to the broker.
func TestClient_Open(t *testing.T) {
c := NewClient("node0")
defer c.Close()
// Create replica on broker.
b := c.Handler.Broker
ok(b.CreateReplica("node0"))
// Open client to broker.
u, _ := url.Parse(c.Handler.HTTPServer.URL)
if err := c.Open([]*url.URL{u}); err != nil {
t.Fatalf("unexpected error: %s", err)
}
// Receive a set of messages from the stream.
if m := <-c.C; m.Type != broker.CreateReplicaMessageType {
t.Fatalf("unexpected message type: %x", m.Type)
}
// Close connection to the broker.
if err := c.Client.Close(); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
// Client represents a test wrapper for the broker client.
type Client struct {
*broker.Client
Handler *Handler // test handler
}
// NewClient returns a new instance of Client.
func NewClient(name string) *Client {
c := &Client{
Client: broker.NewClient(name),
Handler: NewHandler(),
}
return c
}
// Close shutsdown the test handler.
func (c *Client) Close() {
c.Client.Close()
c.Handler.Close()
}

View File

@ -27,4 +27,13 @@ var (
// errReplicaUnavailable is returned when writing bytes to a replica when // errReplicaUnavailable is returned when writing bytes to a replica when
// there is no writer attached to the replica. // there is no writer attached to the replica.
errReplicaUnavailable = errors.New("replica unavailable") errReplicaUnavailable = errors.New("replica unavailable")
// ErrClientOpen is returned when opening an already open client.
ErrClientOpen = errors.New("client already open")
// ErrClientClosed is returned when closing an already closed client.
ErrClientClosed = errors.New("client closed")
// ErrBrokerURLRequired is returned when opening a broker without URLs.
ErrBrokerURLRequired = errors.New("broker url required")
) )

View File

@ -7,30 +7,54 @@ import (
"github.com/influxdb/influxdb/raft" "github.com/influxdb/influxdb/raft"
) )
// HTTPHandler represents an HTTP handler by the broker. // Handler represents an HTTP handler by the broker.
type HTTPHandler struct { type Handler struct {
*raft.HTTPHandler raftHandler *raft.HTTPHandler
broker *Broker broker *Broker
} }
// NewHTTPHandler returns a new instance of HTTPHandler. // NewHandler returns a new instance of Handler.
func NewHTTPHandler(b *Broker) *HTTPHandler { func NewHandler(b *Broker) *Handler {
return &HTTPHandler{ return &Handler{
HTTPHandler: raft.NewHTTPHandler(b.log), raftHandler: raft.NewHTTPHandler(b.log),
broker: b, broker: b,
} }
} }
// ServeHTTP serves an HTTP request. // ServeHTTP serves an HTTP request.
func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Delegate raft requests to its own handler. // Delegate raft requests to its own handler.
if strings.HasPrefix(r.URL.Path, "/raft") { if strings.HasPrefix(r.URL.Path, "/raft") {
h.HTTPHandler.ServeHTTP(w, r) h.raftHandler.ServeHTTP(w, r)
return return
} }
// Route all InfluxDB broker requests. // Route all InfluxDB broker requests.
switch r.URL.Path { switch r.URL.Path {
case "/": case "/stream":
h.serveStream(w, r)
} }
} }
// connects the requestor as the replica's writer.
func (h *Handler) serveStream(w http.ResponseWriter, r *http.Request) {
// Retrieve the replica name.
name := r.URL.Query().Get("name")
if name == "" {
w.Header().Set("X-Broker-Error", "replica name required")
http.Error(w, "replica name required", http.StatusBadRequest)
return
}
// Find the replica on the broker.
replica := h.broker.Replica(name)
if replica == nil {
w.Header().Set("X-Broker-Error", ErrReplicaNotFound.Error())
http.Error(w, ErrReplicaNotFound.Error(), http.StatusNotFound)
return
}
// Connect the response writer to the replica.
// This will block until the replica is closed or a new writer connects.
_, _ = replica.WriteTo(w)
}

View File

@ -1 +1,86 @@
package broker_test package broker_test
import (
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/influxdb/influxdb/broker"
)
// Ensure a replica can connect and stream messages.
func TestHandler_serveStream(t *testing.T) {
h := NewHandler()
defer h.Close()
// Create replica.
h.Broker.CreateReplica("foo")
// Send request to stream the replica.
resp, err := http.Get(h.HTTPServer.URL + `/stream?name=foo`)
defer resp.Body.Close()
if err != nil {
t.Fatalf("unexpected error: %s", err)
} else if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status: %d: %s", resp.StatusCode, resp.Header.Get("X-Broker-Error"))
}
time.Sleep(10 * time.Millisecond)
// Decode from body.
var m broker.Message
dec := broker.NewMessageDecoder(resp.Body)
if err := dec.Decode(&m); err != nil {
t.Fatalf("decode error: %s", err)
} else if m.Index != 2 && m.Type != broker.CreateReplicaMessageType {
t.Fatalf("unexpected index/type: %d / %x", m.Index, m.Type)
}
}
// Ensure an error is returned when requesting a stream without a replica name.
func TestHandler_serveStream_ErrReplicaNameRequired(t *testing.T) {
h := NewHandler()
defer h.Close()
resp, _ := http.Get(h.HTTPServer.URL + `/stream`)
defer resp.Body.Close()
if msg := resp.Header.Get("X-Broker-Error"); resp.StatusCode != http.StatusBadRequest || msg != "replica name required" {
t.Fatalf("unexpected status/error: %d/%s", resp.StatusCode, msg)
}
}
// Ensure an error is returned when requesting a stream for a non-existent replica.
func TestHandler_serveStream_ErrReplicaNotFound(t *testing.T) {
h := NewHandler()
defer h.Close()
resp, _ := http.Get(h.HTTPServer.URL + `/stream?name=no_such_replica`)
defer resp.Body.Close()
if msg := resp.Header.Get("X-Broker-Error"); resp.StatusCode != http.StatusNotFound || msg != "replica not found" {
t.Fatalf("unexpected status/error: %d/%s", resp.StatusCode, msg)
}
}
// Handler is a test wrapper for broker.Handler.
type Handler struct {
*broker.Handler
Broker *Broker
HTTPServer *httptest.Server
}
// NewHandler returns a test handler.
func NewHandler() *Handler {
b := NewBroker()
h := &Handler{
Handler: broker.NewHandler(b.Broker),
Broker: b,
}
h.HTTPServer = httptest.NewServer(h.Handler)
return h
}
// Close stops the server and broker and removes all temp data.
func (h *Handler) Close() {
h.Broker.Close()
h.HTTPServer.Close()
}

View File

@ -20,7 +20,6 @@ import (
"github.com/influxdb/influxdb/metastore" "github.com/influxdb/influxdb/metastore"
"github.com/influxdb/influxdb/parser" "github.com/influxdb/influxdb/parser"
"github.com/influxdb/influxdb/protocol" "github.com/influxdb/influxdb/protocol"
"github.com/influxdb/influxdb/wal"
) )
// defined by cluster config (in cluster package) // defined by cluster config (in cluster package)
@ -35,15 +34,6 @@ type QuerySpec interface {
IsRegex() bool IsRegex() bool
} }
type WAL interface {
AssignSequenceNumbersAndLog(request *protocol.Request, shard wal.Shard) (uint32, error)
AssignSequenceNumbers(request *protocol.Request) error
Commit(requestNumber uint32, serverId uint32) error
CreateCheckpoint() error
RecoverServerFromRequestNumber(requestNumber uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error
RecoverServerFromLastCommit(serverId uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error
}
type ShardCreator interface { type ShardCreator interface {
// the shard creator expects all shards to be of the same type (long term or short term) and have the same // the shard creator expects all shards to be of the same type (long term or short term) and have the same
// start and end times. This is called to create the shard set for a given duration. // start and end times. This is called to create the shard set for a given duration.
@ -78,7 +68,6 @@ type ClusterConfiguration struct {
addedLocalServer bool addedLocalServer bool
connectionCreator func(string) ServerConnection connectionCreator func(string) ServerConnection
shardStore LocalShardStore shardStore LocalShardStore
wal WAL
lastShardIdUsed uint32 lastShardIdUsed uint32
random *rand.Rand random *rand.Rand
lastServerToGetShard *ClusterServer lastServerToGetShard *ClusterServer

View File

@ -12,7 +12,6 @@ import (
"github.com/influxdb/influxdb/metastore" "github.com/influxdb/influxdb/metastore"
"github.com/influxdb/influxdb/parser" "github.com/influxdb/influxdb/parser"
p "github.com/influxdb/influxdb/protocol" p "github.com/influxdb/influxdb/protocol"
"github.com/influxdb/influxdb/wal"
) )
// A shard implements an interface for writing and querying data. // A shard implements an interface for writing and querying data.
@ -52,8 +51,6 @@ type ShardData struct {
startMicro int64 startMicro int64
endMicro int64 endMicro int64
endTime time.Time endTime time.Time
wal WAL
servers []wal.Server
clusterServers []*ClusterServer clusterServers []*ClusterServer
store LocalShardStore store LocalShardStore
serverIds []uint32 serverIds []uint32
@ -63,15 +60,17 @@ type ShardData struct {
IsLocal bool IsLocal bool
SpaceName string SpaceName string
Database string Database string
// REMOVE(broker): wal WAL
// REMOVE(broker): servers []wal.Server
} }
func NewShard(id uint32, startTime, endTime time.Time, database, spaceName string, wal WAL) *ShardData { func NewShard(id uint32, startTime, endTime time.Time, database, spaceName string) *ShardData {
shardDuration := endTime.Sub(startTime) shardDuration := endTime.Sub(startTime)
return &ShardData{ return &ShardData{
id: id, id: id,
startTime: startTime, startTime: startTime,
endTime: endTime, endTime: endTime,
wal: wal,
startMicro: common.TimeToMicroseconds(startTime), startMicro: common.TimeToMicroseconds(startTime),
endMicro: common.TimeToMicroseconds(endTime), endMicro: common.TimeToMicroseconds(endTime),
serverIds: make([]uint32, 0), serverIds: make([]uint32, 0),
@ -177,9 +176,10 @@ func (self *ShardData) DropFields(fields []*metastore.Field) error {
} }
func (self *ShardData) SyncWrite(request *p.Request, assignSeqNum bool) error { func (self *ShardData) SyncWrite(request *p.Request, assignSeqNum bool) error {
if assignSeqNum { // FIX(broker):
self.wal.AssignSequenceNumbers(request) //if assignSeqNum {
} // self.wal.AssignSequenceNumbers(request)
//}
request.ShardId = &self.id request.ShardId = &self.id
for _, server := range self.clusterServers { for _, server := range self.clusterServers {