influxdb/messaging/client_test.go

231 lines
6.0 KiB
Go
Raw Normal View History

2014-10-21 02:42:03 +00:00
package messaging_test
2014-10-17 04:11:28 +00:00
import (
2014-12-17 05:19:26 +00:00
"io/ioutil"
2014-10-17 04:11:28 +00:00
"net/url"
2014-12-17 05:19:26 +00:00
"os"
2014-11-13 05:32:42 +00:00
"strings"
2014-10-17 04:11:28 +00:00
"testing"
2014-10-17 15:53:10 +00:00
"time"
2014-10-17 04:11:28 +00:00
2014-10-21 02:42:03 +00:00
"github.com/influxdb/influxdb/messaging"
2014-10-17 04:11:28 +00:00
)
// Ensure the client replica id can be retrieved.
func TestClient_ReplicaID(t *testing.T) {
c := NewClient(1000)
2014-10-17 15:53:10 +00:00
defer c.Close()
if replicaID := c.ReplicaID(); replicaID != 1000 {
t.Fatalf("unexpected replica id: %s", replicaID)
2014-10-17 15:53:10 +00:00
}
}
2014-10-17 04:11:28 +00:00
// Ensure that a client can open a connect to the broker.
func TestClient_Open(t *testing.T) {
c := NewClient(1000)
2014-10-17 04:11:28 +00:00
defer c.Close()
// Create replica on broker.
c.Server.Handler.Broker().CreateReplica(1000)
2014-10-17 04:11:28 +00:00
// Open client to broker.
2014-12-17 05:19:26 +00:00
f := NewTempFile()
defer os.Remove(f)
2014-11-13 05:32:42 +00:00
u, _ := url.Parse(c.Server.URL)
2014-12-17 05:19:26 +00:00
if err := c.Open(f, []*url.URL{u}); err != nil {
2014-10-17 04:11:28 +00:00
t.Fatalf("unexpected error: %s", err)
}
2014-10-17 15:53:10 +00:00
// Receive a message from the stream.
2014-10-24 04:22:52 +00:00
if m := <-c.C(); m.Type != messaging.CreateReplicaMessageType {
2014-10-17 04:11:28 +00:00
t.Fatalf("unexpected message type: %x", m.Type)
}
// Close connection to the broker.
if err := c.Client.Close(); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
2014-10-17 15:53:10 +00:00
// Ensure that opening an already open client returns an error.
func TestClient_Open_ErrClientOpen(t *testing.T) {
c := NewClient(1000)
2014-10-17 15:53:10 +00:00
defer c.Close()
// Open client to broker.
2014-12-17 05:19:26 +00:00
f := NewTempFile()
defer os.Remove(f)
2014-11-13 05:32:42 +00:00
u, _ := url.Parse(c.Server.URL)
2014-12-17 05:19:26 +00:00
c.Open(f, []*url.URL{u})
if err := c.Open(f, []*url.URL{u}); err != messaging.ErrClientOpen {
2014-10-17 15:53:10 +00:00
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that opening a client without a broker URL returns an error.
func TestClient_Open_ErrBrokerURLRequired(t *testing.T) {
2014-12-17 05:19:26 +00:00
t.Skip()
c := NewClient(1000)
2014-10-17 15:53:10 +00:00
defer c.Close()
2014-12-17 05:19:26 +00:00
f := NewTempFile()
defer os.Remove(f)
if err := c.Open(f, []*url.URL{}); err != messaging.ErrBrokerURLRequired {
2014-10-17 15:53:10 +00:00
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that a client can close while a message is pending.
func TestClient_Close(t *testing.T) {
c := NewClient(1000)
2014-10-17 15:53:10 +00:00
defer c.Close()
// Create replica on broker.
c.Server.Handler.Broker().CreateReplica(1000)
2014-10-17 15:53:10 +00:00
// Open client to broker.
2014-12-17 05:19:26 +00:00
f := NewTempFile()
defer os.Remove(f)
2014-11-13 05:32:42 +00:00
u, _ := url.Parse(c.Server.URL)
2014-12-17 05:19:26 +00:00
if err := c.Open(f, []*url.URL{u}); err != nil {
2014-10-17 15:53:10 +00:00
t.Fatalf("unexpected error: %s", err)
}
time.Sleep(10 * time.Millisecond)
// Close connection to the broker.
if err := c.Client.Close(); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
2014-11-13 05:32:42 +00:00
// Ensure that a client can publish messages to the broker.
func TestClient_Publish(t *testing.T) {
c := OpenClient(1000)
2014-11-13 05:32:42 +00:00
defer c.Close()
// Publish message to the broker.
if index, err := c.Publish(&messaging.Message{Type: 100, TopicID: messaging.BroadcastTopicID, Data: []byte{0}}); err != nil {
t.Fatalf("unexpected error: %v", err)
} else if index != 3 {
t.Fatalf("unexpected index: %d", index)
}
}
// Ensure that a client receives an error when publishing to a stopped server.
func TestClient_Publish_ErrConnectionRefused(t *testing.T) {
c := OpenClient(1000)
2014-11-13 05:32:42 +00:00
c.Server.Close()
defer c.Close()
// Publish message to the broker.
if _, err := c.Publish(&messaging.Message{Type: 100, TopicID: 0, Data: []byte{0}}); err == nil || !strings.Contains(err.Error(), "connection refused") {
t.Fatalf("unexpected error: %v", err)
}
}
// Ensure that a client receives an error when publishing to a closed broker.
func TestClient_Publish_ErrLogClosed(t *testing.T) {
c := OpenClient(1000)
2014-11-13 05:32:42 +00:00
c.Server.Handler.Broker().Close()
defer c.Close()
// Publish message to the broker.
if _, err := c.Publish(&messaging.Message{Type: 100, TopicID: 0, Data: []byte{0}}); err == nil || err.Error() != "log closed" {
t.Fatalf("unexpected error: %v", err)
}
}
// Ensure that a client can create a replica.
func TestClient_CreateReplica(t *testing.T) {
c := OpenClient(0)
defer c.Close()
// Create replica through client.
if err := c.CreateReplica(123); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Verify replica was created.
if r := c.Server.Handler.Broker().Replica(123); r == nil {
t.Fatalf("replica not created")
}
}
// Ensure that a client can passthrough an error while creating a replica.
func TestClient_CreateReplica_Err(t *testing.T) {
c := OpenClient(0)
defer c.Close()
c.Server.Handler.Broker().CreateReplica(123)
if err := c.CreateReplica(123); err == nil || err.Error() != `replica already exists` {
t.Fatalf("unexpected error: %v", err)
}
}
// Ensure that a client can delete a replica.
func TestClient_DeleteReplica(t *testing.T) {
c := OpenClient(0)
defer c.Close()
c.Server.Handler.Broker().CreateReplica(123)
// Delete replica through client.
if err := c.DeleteReplica(123); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Verify replica was deleted.
if r := c.Server.Handler.Broker().Replica(123); r != nil {
t.Fatalf("replica not deleted")
}
}
2014-10-17 04:11:28 +00:00
// Client represents a test wrapper for the broker client.
type Client struct {
2014-12-17 05:19:26 +00:00
clientConfig string // Temporary file for client config.
2014-10-21 02:42:03 +00:00
*messaging.Client
2014-11-13 05:32:42 +00:00
Server *Server // test server
2014-10-17 04:11:28 +00:00
}
// NewClient returns a new instance of Client.
func NewClient(replicaID uint64) *Client {
return &Client{
2014-12-17 05:19:26 +00:00
clientConfig: "", // Not all tests with NewClient require automatic temp file creation.
Client: messaging.NewClient(replicaID),
2014-12-17 05:19:26 +00:00
Server: NewServer(),
2014-11-13 05:32:42 +00:00
}
}
// OpenClient returns a new, open instance of Client.
func OpenClient(replicaID uint64) *Client {
c := NewClient(replicaID)
c.Server.Handler.Broker().CreateReplica(replicaID)
2014-11-13 05:32:42 +00:00
// Open client to broker.
2014-12-17 05:19:26 +00:00
c.clientConfig = NewTempFile()
2014-11-13 05:32:42 +00:00
u, _ := url.Parse(c.Server.URL)
2014-12-17 05:19:26 +00:00
if err := c.Open(c.clientConfig, []*url.URL{u}); err != nil {
2014-11-13 05:32:42 +00:00
panic(err)
2014-10-17 04:11:28 +00:00
}
2014-11-13 05:32:42 +00:00
time.Sleep(10 * time.Millisecond)
2014-10-17 04:11:28 +00:00
return c
}
2014-11-13 05:32:42 +00:00
// Close shuts down the client and server.
2014-10-17 04:11:28 +00:00
func (c *Client) Close() {
c.Client.Close()
2014-12-17 05:19:26 +00:00
if c.clientConfig != "" {
os.Remove(c.clientConfig)
}
2014-11-13 05:32:42 +00:00
c.Server.Close()
2014-10-17 04:11:28 +00:00
}
2014-12-17 05:19:26 +00:00
// NewTempFile returns the path of a new temporary file.
// It is up to the caller to remove it when finished.
func NewTempFile() string {
f, err := ioutil.TempFile("", "influxdb-client-test")
if err != nil {
panic(err)
}
defer f.Close()
return f.Name()
}