influxdb/messaging/handler_test.go

202 lines
6.1 KiB
Go

package messaging_test
import (
"net/http"
"net/http/httptest"
"reflect"
"strings"
"testing"
"time"
"github.com/influxdb/influxdb/messaging"
)
// Ensure a replica can connect and stream messages.
func TestHandler_stream(t *testing.T) {
s := NewServer()
defer s.Close()
// Create replica.
s.Handler.Broker().CreateReplica("foo")
// Send request to stream the replica.
resp, err := http.Get(s.URL + `/messages?name=foo`)
defer resp.Body.Close()
if err != nil {
t.Fatalf("unexpected error: %s", err)
} else if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status: %d: %s", resp.StatusCode, resp.Header.Get("X-Broker-Error"))
}
time.Sleep(10 * time.Millisecond)
// Decode from body.
var m messaging.Message
dec := messaging.NewMessageDecoder(resp.Body)
if err := dec.Decode(&m); err != nil {
t.Fatalf("decode error: %s", err)
} else if m.Index != 2 && m.Type != messaging.CreateReplicaMessageType {
t.Fatalf("unexpected index/type: %d / %x", m.Index, m.Type)
}
}
// Ensure an error is returned when requesting a stream without a replica name.
func TestHandler_stream_ErrReplicaNameRequired(t *testing.T) {
s := NewServer()
defer s.Close()
resp, _ := http.Get(s.URL + `/messages`)
defer resp.Body.Close()
if msg := resp.Header.Get("X-Broker-Error"); resp.StatusCode != http.StatusBadRequest || msg != "replica name required" {
t.Fatalf("unexpected status/error: %d/%s", resp.StatusCode, msg)
}
}
// Ensure an error is returned when requesting a stream for a non-existent replica.
func TestHandler_stream_ErrReplicaNotFound(t *testing.T) {
s := NewServer()
defer s.Close()
resp, _ := http.Get(s.URL + `/messages?name=no_such_replica`)
defer resp.Body.Close()
if msg := resp.Header.Get("X-Broker-Error"); resp.StatusCode != http.StatusNotFound || msg != "replica not found" {
t.Fatalf("unexpected status/error: %d/%s", resp.StatusCode, msg)
}
}
// Ensure an error is returned when requesting a stream with the wrong HTTP method.
func TestHandler_stream_ErrMethodNotAllowed(t *testing.T) {
s := NewServer()
defer s.Close()
resp, _ := http.Head(s.URL + `/messages`)
defer resp.Body.Close()
if resp.StatusCode != http.StatusMethodNotAllowed {
t.Fatalf("unexpected status: %d", resp.StatusCode)
}
}
// Ensure a handler can publish a message.
func TestHandler_publish(t *testing.T) {
s := NewServer()
defer s.Close()
// Stream subscription for a replica.
var m messaging.Message
s.Handler.Broker().CreateReplica("replica0")
s.Handler.Broker().Subscribe("replica0", 200)
go func() {
resp, _ := http.Get(s.URL + `/messages?name=replica0`)
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected response code: %d", resp.StatusCode)
}
dec := messaging.NewMessageDecoder(resp.Body)
for {
if err := dec.Decode(&m); err != nil {
return
}
}
}()
// Send request to the broker.
resp, _ := http.Post(s.URL+`/messages?type=100&topicID=200`, "application/octet-stream", strings.NewReader(`abc`))
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status: %d: %s", resp.StatusCode, resp.Header.Get("X-Broker-Error"))
}
s.Handler.Broker().Sync(4)
// Check if the last message received is our new message.
time.Sleep(10 * time.Millisecond)
if !reflect.DeepEqual(&m, &messaging.Message{Type: 100, Index: 4, TopicID: 200, Data: []byte("abc")}) {
t.Fatalf("unexpected message: %#v", &m)
}
}
// Ensure a handler returns an error when publishing a message without a type.
func TestHandler_publish_ErrMessageTypeRequired(t *testing.T) {
s := NewServer()
defer s.Close()
// Send request to the broker.
resp, _ := http.Post(s.URL+`/messages?topicID=200`, "application/octet-stream", strings.NewReader(`foo`))
defer resp.Body.Close()
if resp.StatusCode != http.StatusBadRequest {
t.Fatalf("unexpected status: %d", resp.StatusCode)
} else if resp.Header.Get("X-Broker-Error") != "message type required" {
t.Fatalf("unexpected error: %s", resp.Header.Get("X-Broker-Error"))
}
}
// Ensure a handler returns an error when publishing a message without a topic.
func TestHandler_publish_ErrTopicRequired(t *testing.T) {
s := NewServer()
defer s.Close()
// Send request to the broker.
resp, _ := http.Post(s.URL+`/messages?type=100`, "application/octet-stream", strings.NewReader(`foo`))
defer resp.Body.Close()
if resp.StatusCode != http.StatusBadRequest {
t.Fatalf("unexpected status: %d", resp.StatusCode)
} else if resp.Header.Get("X-Broker-Error") != "topic required" {
t.Fatalf("unexpected error: %s", resp.Header.Get("X-Broker-Error"))
}
}
// Ensure a handler returns an error when publishing to a closed broker.
func TestHandler_publish_ErrClosed(t *testing.T) {
s := NewServer()
s.Handler.Broker().Close()
defer s.Close()
// Send request to the broker.
resp, _ := http.Post(s.URL+`/messages?type=100&topicID=200`, "application/octet-stream", strings.NewReader(`foo`))
defer resp.Body.Close()
if resp.StatusCode != http.StatusInternalServerError {
t.Fatalf("unexpected status: %d", resp.StatusCode)
} else if resp.Header.Get("X-Broker-Error") != "log closed" {
t.Fatalf("unexpected error: %s", resp.Header.Get("X-Broker-Error"))
}
}
// Ensure the handler routes raft requests to the raft handler.
func TestHandler_raft(t *testing.T) {
s := NewServer()
defer s.Close()
resp, _ := http.Get(s.URL + `/raft/ping`)
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status: %d", resp.StatusCode)
}
}
// Ensure the handler returns an error for an invalid path.
func TestHandler_ErrNotFound(t *testing.T) {
s := NewServer()
defer s.Close()
resp, _ := http.Get(s.URL + `/no_such_path`)
defer resp.Body.Close()
if resp.StatusCode != http.StatusNotFound {
t.Fatalf("unexpected status: %d", resp.StatusCode)
}
}
// Server is an test HTTP server that wraps a handler and broker.
type Server struct {
*httptest.Server
Handler *messaging.Handler
}
// NewServer returns a test server.
func NewServer() *Server {
h := messaging.NewHandler(NewBroker().Broker)
return &Server{httptest.NewServer(h), h}
}
// Close stops the server and broker and removes all temp data.
func (s *Server) Close() {
(&Broker{s.Handler.Broker()}).Close()
s.Server.Close()
}