From 14fd40cdb53fdcda1e03b7b60841d8b874df8463 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 16 Oct 2014 22:11:28 -0600 Subject: [PATCH] Add streaming HTTP broker handler. --- broker/TODO | 5 +- broker/broker.go | 17 +++- broker/broker_test.go | 6 ++ broker/client.go | 162 +++++++++++++++++++++++++++++++ broker/client_test.go | 55 +++++++++++ broker/errors.go | 9 ++ broker/handler.go | 46 ++++++--- broker/handler_test.go | 85 ++++++++++++++++ cluster/cluster_configuration.go | 11 --- cluster/shard.go | 16 +-- 10 files changed, 377 insertions(+), 35 deletions(-) create mode 100644 broker/client.go create mode 100644 broker/client_test.go diff --git a/broker/TODO b/broker/TODO index 107330e165..6f5f72f8e3 100644 --- a/broker/TODO +++ b/broker/TODO @@ -3,10 +3,6 @@ Broker ## Uncompleted -- [ ] Test coverage - - [x] Broker.DeleteReplica - - [ ] Broker.Subscribe - - [ ] Broker.Unsubscribe - [ ] Cluster configuration integration - [ ] Broker FSM snapshotting - [ ] HTTP Handler @@ -26,3 +22,4 @@ Broker - [x] Broker publishing - [x] Config topic - [x] Stream topic from index +- [x] Test coverage diff --git a/broker/broker.go b/broker/broker.go index 363adc62ba..7df8450d4e 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -86,6 +86,11 @@ func (b *Broker) Close() error { // TODO: Close all topics. + // Close all replicas. + for _, r := range b.replicas { + r.closeWriter() + } + // Close raft log. _ = b.log.Close() @@ -653,6 +658,11 @@ func (r *Replica) Write(p []byte) (int, error) { return n, errReplicaUnavailable } + // If the writer has a flush method then call it. + if w, ok := r.writer.(flusher); ok { + w.Flush() + } + return n, nil } @@ -802,13 +812,18 @@ func (dec *MessageDecoder) Decode(m *Message) error { m.Data = make([]byte, binary.BigEndian.Uint32(b[10:14])) // Read data. - if _, err := io.ReadFull(dec.r, m.Data); err != nil { + if n, err := io.ReadFull(dec.r, m.Data); err != nil { + warn("io.2", n, len(m.Data), err) return err } return nil } +type flusher interface { + Flush() +} + // jsonify marshals a value to a JSON-encoded byte slice. // This should only be used with internal data that will not return marshal errors. func jsonify(v interface{}) []byte { diff --git a/broker/broker_test.go b/broker/broker_test.go index fc251b179a..e9422ac041 100644 --- a/broker/broker_test.go +++ b/broker/broker_test.go @@ -299,3 +299,9 @@ func tempfile() string { func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) } func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) } + +func ok(err error) { + if err != nil { + panic("unexpected error") + } +} diff --git a/broker/client.go b/broker/client.go new file mode 100644 index 0000000000..afb8ffaf35 --- /dev/null +++ b/broker/client.go @@ -0,0 +1,162 @@ +package broker + +import ( + "errors" + "math/rand" + "net/http" + "net/url" + "sync" + "time" +) + +// ReconnectTimeout is the time to wait between stream disconnects before retrying. +const ReconnectTimeout = 100 * time.Millisecond + +// Client represents a client for the broker's HTTP API. +// Once opened, the client will stream down all messages that +type Client struct { + mu sync.Mutex + name string // the name of the client connecting. + urls []*url.URL // list of URLs for all known brokers. + + opened bool + done chan struct{} // disconnection notification + + // Channel streams messages from the broker. + // Messages can be duplicated so it is important to check the index + // of the incoming message index to make sure it has not been processed. + C chan *Message +} + +// NewClient returns a new instance of Client. +func NewClient(name string) *Client { + return &Client{ + name: name, + } +} + +// Name returns the replica name that the client was opened with. +func (c *Client) Name() string { return c.name } + +// URLs returns a list of broker URLs to connect to. +func (c *Client) URLs() []*url.URL { + c.mu.Lock() + defer c.mu.Unlock() + return c.urls +} + +// Open initializes and opens the connection to the broker cluster. +func (c *Client) Open(urls []*url.URL) error { + c.mu.Lock() + defer c.mu.Unlock() + + // Return error if the client is already open. + // Require at least one broker URL. + if c.opened { + return ErrClientOpen + } else if len(urls) == 0 { + return ErrBrokerURLRequired + } + + // Set the URLs to connect to on the client. + c.urls = urls + + // Create a channel for streaming messages. + c.C = make(chan *Message, 0) + + // Open the streamer. + c.done = make(chan struct{}) + go c.streamer(c.done) + + // Set open flag. + c.opened = true + + return nil +} + +// Close disconnects the client from the broker cluster. +func (c *Client) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + + // Return error if the client is already closed. + if !c.opened { + return ErrClientClosed + } + + // Close message stream. + close(c.C) + c.C = nil + + // Shutdown streamer. + close(c.done) + c.done = nil + + // Unset open flag. + c.opened = false + + return nil +} + +// streamer connects to a broker server and streams the replica's messages. +func (c *Client) streamer(done chan struct{}) { + for { + // Check for the client disconnection. + select { + case <-done: + return + default: + } + + // TODO: Validate that there is at least one broker URL. + + // Choose a random broker URL. + urls := c.URLs() + u := *urls[rand.Intn(len(urls))] + + // Connect to broker and stream. + u.Path = "/stream" + if err := c.streamFromURL(&u, done); err == errDone { + return + } + } +} + +// streamFromURL connects to a broker server and streams the replica's messages. +func (c *Client) streamFromURL(u *url.URL, done chan struct{}) error { + u.RawQuery = url.Values{"name": {c.name}}.Encode() + resp, err := http.Get(u.String()) + if err != nil { + time.Sleep(ReconnectTimeout) + return nil + } + defer func() { _ = resp.Body.Close() }() + + // Ensure that we received a 200 OK from the server before streaming. + if resp.StatusCode != http.StatusOK { + warn("status:", resp.StatusCode) + } + + // Continuously decode messages from request body. + dec := NewMessageDecoder(resp.Body) + for { + // Decode message from the stream. + m := &Message{} + if err := dec.Decode(m); err != nil { + return err + } + + // Send message to channel. + c.C <- m + + // Check for notification of disconnect. + select { + case <-done: + return errDone + default: + } + } +} + +// marker error for the streamer. +var errDone = errors.New("done") diff --git a/broker/client_test.go b/broker/client_test.go new file mode 100644 index 0000000000..bbcd3e1bfa --- /dev/null +++ b/broker/client_test.go @@ -0,0 +1,55 @@ +package broker_test + +import ( + "net/url" + "testing" + + "github.com/influxdb/influxdb/broker" +) + +// Ensure that a client can open a connect to the broker. +func TestClient_Open(t *testing.T) { + c := NewClient("node0") + defer c.Close() + + // Create replica on broker. + b := c.Handler.Broker + ok(b.CreateReplica("node0")) + + // Open client to broker. + u, _ := url.Parse(c.Handler.HTTPServer.URL) + if err := c.Open([]*url.URL{u}); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Receive a set of messages from the stream. + if m := <-c.C; m.Type != broker.CreateReplicaMessageType { + t.Fatalf("unexpected message type: %x", m.Type) + } + + // Close connection to the broker. + if err := c.Client.Close(); err != nil { + t.Fatalf("unexpected error: %s", err) + } +} + +// Client represents a test wrapper for the broker client. +type Client struct { + *broker.Client + Handler *Handler // test handler +} + +// NewClient returns a new instance of Client. +func NewClient(name string) *Client { + c := &Client{ + Client: broker.NewClient(name), + Handler: NewHandler(), + } + return c +} + +// Close shutsdown the test handler. +func (c *Client) Close() { + c.Client.Close() + c.Handler.Close() +} diff --git a/broker/errors.go b/broker/errors.go index f0875177ec..d75c212074 100644 --- a/broker/errors.go +++ b/broker/errors.go @@ -27,4 +27,13 @@ var ( // errReplicaUnavailable is returned when writing bytes to a replica when // there is no writer attached to the replica. errReplicaUnavailable = errors.New("replica unavailable") + + // ErrClientOpen is returned when opening an already open client. + ErrClientOpen = errors.New("client already open") + + // ErrClientClosed is returned when closing an already closed client. + ErrClientClosed = errors.New("client closed") + + // ErrBrokerURLRequired is returned when opening a broker without URLs. + ErrBrokerURLRequired = errors.New("broker url required") ) diff --git a/broker/handler.go b/broker/handler.go index 3eb5e20539..dc51c6b1ad 100644 --- a/broker/handler.go +++ b/broker/handler.go @@ -7,30 +7,54 @@ import ( "github.com/influxdb/influxdb/raft" ) -// HTTPHandler represents an HTTP handler by the broker. -type HTTPHandler struct { - *raft.HTTPHandler - broker *Broker +// Handler represents an HTTP handler by the broker. +type Handler struct { + raftHandler *raft.HTTPHandler + broker *Broker } -// NewHTTPHandler returns a new instance of HTTPHandler. -func NewHTTPHandler(b *Broker) *HTTPHandler { - return &HTTPHandler{ - HTTPHandler: raft.NewHTTPHandler(b.log), +// NewHandler returns a new instance of Handler. +func NewHandler(b *Broker) *Handler { + return &Handler{ + raftHandler: raft.NewHTTPHandler(b.log), broker: b, } } // ServeHTTP serves an HTTP request. -func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Delegate raft requests to its own handler. if strings.HasPrefix(r.URL.Path, "/raft") { - h.HTTPHandler.ServeHTTP(w, r) + h.raftHandler.ServeHTTP(w, r) return } // Route all InfluxDB broker requests. switch r.URL.Path { - case "/": + case "/stream": + h.serveStream(w, r) } } + +// connects the requestor as the replica's writer. +func (h *Handler) serveStream(w http.ResponseWriter, r *http.Request) { + // Retrieve the replica name. + name := r.URL.Query().Get("name") + if name == "" { + w.Header().Set("X-Broker-Error", "replica name required") + http.Error(w, "replica name required", http.StatusBadRequest) + return + } + + // Find the replica on the broker. + replica := h.broker.Replica(name) + if replica == nil { + w.Header().Set("X-Broker-Error", ErrReplicaNotFound.Error()) + http.Error(w, ErrReplicaNotFound.Error(), http.StatusNotFound) + return + } + + // Connect the response writer to the replica. + // This will block until the replica is closed or a new writer connects. + _, _ = replica.WriteTo(w) +} diff --git a/broker/handler_test.go b/broker/handler_test.go index a0afd82f95..40eca73420 100644 --- a/broker/handler_test.go +++ b/broker/handler_test.go @@ -1 +1,86 @@ package broker_test + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/influxdb/influxdb/broker" +) + +// Ensure a replica can connect and stream messages. +func TestHandler_serveStream(t *testing.T) { + h := NewHandler() + defer h.Close() + + // Create replica. + h.Broker.CreateReplica("foo") + + // Send request to stream the replica. + resp, err := http.Get(h.HTTPServer.URL + `/stream?name=foo`) + defer resp.Body.Close() + if err != nil { + t.Fatalf("unexpected error: %s", err) + } else if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected status: %d: %s", resp.StatusCode, resp.Header.Get("X-Broker-Error")) + } + time.Sleep(10 * time.Millisecond) + + // Decode from body. + var m broker.Message + dec := broker.NewMessageDecoder(resp.Body) + if err := dec.Decode(&m); err != nil { + t.Fatalf("decode error: %s", err) + } else if m.Index != 2 && m.Type != broker.CreateReplicaMessageType { + t.Fatalf("unexpected index/type: %d / %x", m.Index, m.Type) + } +} + +// Ensure an error is returned when requesting a stream without a replica name. +func TestHandler_serveStream_ErrReplicaNameRequired(t *testing.T) { + h := NewHandler() + defer h.Close() + + resp, _ := http.Get(h.HTTPServer.URL + `/stream`) + defer resp.Body.Close() + if msg := resp.Header.Get("X-Broker-Error"); resp.StatusCode != http.StatusBadRequest || msg != "replica name required" { + t.Fatalf("unexpected status/error: %d/%s", resp.StatusCode, msg) + } +} + +// Ensure an error is returned when requesting a stream for a non-existent replica. +func TestHandler_serveStream_ErrReplicaNotFound(t *testing.T) { + h := NewHandler() + defer h.Close() + + resp, _ := http.Get(h.HTTPServer.URL + `/stream?name=no_such_replica`) + defer resp.Body.Close() + if msg := resp.Header.Get("X-Broker-Error"); resp.StatusCode != http.StatusNotFound || msg != "replica not found" { + t.Fatalf("unexpected status/error: %d/%s", resp.StatusCode, msg) + } +} + +// Handler is a test wrapper for broker.Handler. +type Handler struct { + *broker.Handler + Broker *Broker + HTTPServer *httptest.Server +} + +// NewHandler returns a test handler. +func NewHandler() *Handler { + b := NewBroker() + h := &Handler{ + Handler: broker.NewHandler(b.Broker), + Broker: b, + } + h.HTTPServer = httptest.NewServer(h.Handler) + return h +} + +// Close stops the server and broker and removes all temp data. +func (h *Handler) Close() { + h.Broker.Close() + h.HTTPServer.Close() +} diff --git a/cluster/cluster_configuration.go b/cluster/cluster_configuration.go index 1126cf3c02..1068bf8a61 100644 --- a/cluster/cluster_configuration.go +++ b/cluster/cluster_configuration.go @@ -20,7 +20,6 @@ import ( "github.com/influxdb/influxdb/metastore" "github.com/influxdb/influxdb/parser" "github.com/influxdb/influxdb/protocol" - "github.com/influxdb/influxdb/wal" ) // defined by cluster config (in cluster package) @@ -35,15 +34,6 @@ type QuerySpec interface { IsRegex() bool } -type WAL interface { - AssignSequenceNumbersAndLog(request *protocol.Request, shard wal.Shard) (uint32, error) - AssignSequenceNumbers(request *protocol.Request) error - Commit(requestNumber uint32, serverId uint32) error - CreateCheckpoint() error - RecoverServerFromRequestNumber(requestNumber uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error - RecoverServerFromLastCommit(serverId uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error -} - type ShardCreator interface { // the shard creator expects all shards to be of the same type (long term or short term) and have the same // start and end times. This is called to create the shard set for a given duration. @@ -78,7 +68,6 @@ type ClusterConfiguration struct { addedLocalServer bool connectionCreator func(string) ServerConnection shardStore LocalShardStore - wal WAL lastShardIdUsed uint32 random *rand.Rand lastServerToGetShard *ClusterServer diff --git a/cluster/shard.go b/cluster/shard.go index 3a4fd929db..6bf1b1aebe 100644 --- a/cluster/shard.go +++ b/cluster/shard.go @@ -12,7 +12,6 @@ import ( "github.com/influxdb/influxdb/metastore" "github.com/influxdb/influxdb/parser" p "github.com/influxdb/influxdb/protocol" - "github.com/influxdb/influxdb/wal" ) // A shard implements an interface for writing and querying data. @@ -52,8 +51,6 @@ type ShardData struct { startMicro int64 endMicro int64 endTime time.Time - wal WAL - servers []wal.Server clusterServers []*ClusterServer store LocalShardStore serverIds []uint32 @@ -63,15 +60,17 @@ type ShardData struct { IsLocal bool SpaceName string Database string + + // REMOVE(broker): wal WAL + // REMOVE(broker): servers []wal.Server } -func NewShard(id uint32, startTime, endTime time.Time, database, spaceName string, wal WAL) *ShardData { +func NewShard(id uint32, startTime, endTime time.Time, database, spaceName string) *ShardData { shardDuration := endTime.Sub(startTime) return &ShardData{ id: id, startTime: startTime, endTime: endTime, - wal: wal, startMicro: common.TimeToMicroseconds(startTime), endMicro: common.TimeToMicroseconds(endTime), serverIds: make([]uint32, 0), @@ -177,9 +176,10 @@ func (self *ShardData) DropFields(fields []*metastore.Field) error { } func (self *ShardData) SyncWrite(request *p.Request, assignSeqNum bool) error { - if assignSeqNum { - self.wal.AssignSequenceNumbers(request) - } + // FIX(broker): + //if assignSeqNum { + // self.wal.AssignSequenceNumbers(request) + //} request.ShardId = &self.id for _, server := range self.clusterServers {