Integrate stateless broker into remaining packages.

pull/1935/head
Ben Johnson 2015-03-10 14:53:45 -06:00
parent 4160d0b785
commit 27e9132796
23 changed files with 603 additions and 648 deletions

View File

@ -1,10 +1,6 @@
package influxdb
/*
import (
"fmt"
"log"
"net/http"
"time"
"github.com/influxdb/influxdb/messaging"
@ -31,7 +27,7 @@ type Broker struct {
done chan struct{}
// send CQ processing requests to the same data node
currentCQProcessingNode *messaging.Replica
// currentCQProcessingNode *messaging.Replica // FIX(benbjohnson)
// variables to control when to trigger processing and when to timeout
TriggerInterval time.Duration
@ -51,10 +47,14 @@ func NewBroker() *Broker {
// RunContinuousQueryLoop starts running continuous queries on a background goroutine.
func (b *Broker) RunContinuousQueryLoop() {
b.done = make(chan struct{})
go b.continuousQueryLoop(b.done)
// FIX(benbjohnson)
// b.done = make(chan struct{})
// go b.continuousQueryLoop(b.done)
}
/*
// Close closes the broker.
func (b *Broker) Close() error {
if b.done != nil {

View File

@ -180,8 +180,8 @@ func (c *Config) DataAddrUDP() string {
}
// DataURL returns the URL required to contact the data server.
func (c *Config) DataURL() *url.URL {
return &url.URL{
func (c *Config) DataURL() url.URL {
return url.URL{
Scheme: "http",
Host: net.JoinHostPort(c.Hostname, strconv.Itoa(c.Data.Port)),
}
@ -193,8 +193,8 @@ func (c *Config) BrokerAddr() string {
}
// BrokerURL returns the URL required to contact the Broker server.
func (c *Config) BrokerURL() *url.URL {
return &url.URL{
func (c *Config) BrokerURL() url.URL {
return url.URL{
Scheme: "http",
Host: net.JoinHostPort(c.Hostname, strconv.Itoa(c.Broker.Port)),
}

View File

@ -20,6 +20,7 @@ import (
"github.com/influxdb/influxdb/graphite"
"github.com/influxdb/influxdb/httpd"
"github.com/influxdb/influxdb/messaging"
"github.com/influxdb/influxdb/raft"
"github.com/influxdb/influxdb/udp"
)
@ -34,20 +35,26 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
initializing := !fileExists(config.BrokerDir()) && !fileExists(config.DataDir())
// Parse join urls from the --join flag.
var joinURLs []*url.URL
var joinURLs []url.URL
if join == "" {
joinURLs = parseURLs(config.JoinURLs())
} else {
joinURLs = parseURLs(join)
}
// Open broker, initialize or join as necessary.
b := openBroker(config.BrokerDir(), config.BrokerURL(), initializing, joinURLs, logWriter)
// Open broker & raft log, initialize or join as necessary.
b, l := openBroker(config.BrokerDir(), config.BrokerURL(), initializing, joinURLs, logWriter)
// Start the broker handler.
var h *Handler
if b != nil {
h = &Handler{brokerHandler: messaging.NewHandler(b.Broker)}
h = &Handler{
brokerHandler: &messaging.Handler{
Broker: b.Broker,
RaftHandler: &raft.Handler{Log: l},
},
}
// We want to make sure we are spun up before we exit this function, so we manually listen and serve
listener, err := net.Listen("tcp", config.BrokerAddr())
if err != nil {
@ -158,7 +165,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
// unless disabled, start the loop to report anonymous usage stats every 24h
if !config.ReportingDisabled {
clusterID := b.Broker.Log().Config().ClusterID
clusterID := b.Broker.ClusterID()
go s.StartReportingLoop(version, clusterID)
}
@ -206,57 +213,67 @@ func parseConfig(path, hostname string) *Config {
}
// creates and initializes a broker.
func openBroker(path string, u *url.URL, initializing bool, joinURLs []*url.URL, w io.Writer) *influxdb.Broker {
func openBroker(path string, u url.URL, initializing bool, joinURLs []url.URL, w io.Writer) (*influxdb.Broker, *raft.Log) {
// Ignore if there's no existing broker and we're not initializing or joining.
if !fileExists(path) && !initializing && len(joinURLs) == 0 {
return nil
return nil, nil
}
// Create raft log.
l := raft.NewLog()
l.SetURL(u)
l.SetLogOutput(w)
// Create broker.
b := influxdb.NewBroker()
b.Log = l
b.SetLogOutput(w)
if err := b.Open(path, u); err != nil {
// Open broker so it can feed last index data to the log.
if err := b.Open(path); err != nil {
log.Fatalf("failed to open broker: %s", err)
}
// If this is a new broker then we can initialie two ways:
// Attach the broker as the finite state machine of the raft log.
l.FSM = &messaging.RaftFSM{Broker: b}
// Open raft log inside broker directory.
if err := l.Open(filepath.Join(path, "raft")); err != nil {
log.Fatalf("raft: %s", err)
}
// If this is a new raft log then we can initialie two ways:
// 1) Start a brand new cluster.
// 2) Join an existing cluster.
if initializing {
if len(joinURLs) == 0 {
initializeBroker(b)
if err := l.Initialize(); err != nil {
log.Fatalf("initialize raft log: %s", err)
}
} else {
joinBroker(b, joinURLs)
joinLog(l, joinURLs)
}
}
return b
return b, l
}
// initializes a new broker.
func initializeBroker(b *influxdb.Broker) {
if err := b.Initialize(); err != nil {
log.Fatalf("initialize: %s", err)
}
}
// joins a broker to an existing cluster.
func joinBroker(b *influxdb.Broker, joinURLs []*url.URL) {
// joins a raft log to an existing cluster.
func joinLog(l *raft.Log, joinURLs []url.URL) {
// Attempts to join each server until successful.
for _, u := range joinURLs {
if err := b.Join(u); err != nil {
log.Printf("join: failed to connect to broker: %s: %s", u, err)
if err := l.Join(u); err != nil {
log.Printf("join: failed to connect to raft cluster: %s: %s", u, err)
} else {
log.Printf("join: connected broker to %s", u)
log.Printf("join: connected raft log to %s", u)
return
}
}
log.Fatalf("join: failed to connect broker to any specified server")
log.Fatalf("join: failed to connect raft log to any specified server")
}
// creates and initializes a server.
func openServer(config *Config, b *influxdb.Broker, initializing, configExists bool, joinURLs []*url.URL, w io.Writer) *influxdb.Server {
func openServer(config *Config, b *influxdb.Broker, initializing, configExists bool, joinURLs []url.URL, w io.Writer) *influxdb.Server {
// Ignore if there's no existing server and we're not initializing or joining.
if !fileExists(config.Data.Dir) && !initializing && len(joinURLs) == 0 {
return nil
@ -286,13 +303,13 @@ func openServer(config *Config, b *influxdb.Broker, initializing, configExists b
} else if !configExists {
// We are spining up a server that has no config,
// but already has an initialized data directory
joinURLs = []*url.URL{b.URL()}
joinURLs = []url.URL{b.URL()}
openServerClient(s, joinURLs, w)
} else {
if len(joinURLs) == 0 {
// If a config exists, but no joinUrls are specified, fall back to the broker URL
// TODO: Make sure we have a leader, and then spin up the server
joinURLs = []*url.URL{b.URL()}
joinURLs = []url.URL{b.URL()}
}
openServerClient(s, joinURLs, w)
}
@ -301,18 +318,13 @@ func openServer(config *Config, b *influxdb.Broker, initializing, configExists b
}
// initializes a new server that does not yet have an ID.
func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.Writer) {
func initializeServer(u url.URL, s *influxdb.Server, b *influxdb.Broker, w io.Writer) {
// TODO: Create replica using the messaging client.
// Create replica on broker.
if err := b.CreateReplica(1, u); err != nil {
log.Fatalf("replica creation error: %s", err)
}
// Create messaging client.
c := messaging.NewClient(1)
c := influxdb.NewMessagingClient()
c.SetLogOutput(w)
if err := c.Open(filepath.Join(s.Path(), messagingClientFile), []*url.URL{b.URL()}); err != nil {
if err := c.Open(filepath.Join(s.Path(), messagingClientFile), []url.URL{b.URL()}); err != nil {
log.Fatalf("messaging client error: %s", err)
}
if err := s.SetClient(c); err != nil {
@ -326,12 +338,12 @@ func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.W
}
// joins a server to an existing cluster.
func joinServer(s *influxdb.Server, u *url.URL, joinURLs []*url.URL) {
func joinServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) {
// TODO: Use separate broker and data join urls.
// Create data node on an existing data node.
for _, joinURL := range joinURLs {
if err := s.Join(u, joinURL); err != nil {
if err := s.Join(&u, &joinURL); err != nil {
log.Printf("join: failed to connect data node: %s: %s", u, err)
} else {
log.Printf("join: connected data node to %s", u)
@ -342,8 +354,8 @@ func joinServer(s *influxdb.Server, u *url.URL, joinURLs []*url.URL) {
}
// opens the messaging client and attaches it to the server.
func openServerClient(s *influxdb.Server, joinURLs []*url.URL, w io.Writer) {
c := messaging.NewClient(s.ID())
func openServerClient(s *influxdb.Server, joinURLs []url.URL, w io.Writer) {
c := influxdb.NewMessagingClient()
c.SetLogOutput(w)
if err := c.Open(filepath.Join(s.Path(), messagingClientFile), joinURLs); err != nil {
log.Fatalf("messaging client error: %s", err)
@ -354,7 +366,7 @@ func openServerClient(s *influxdb.Server, joinURLs []*url.URL, w io.Writer) {
}
// parses a comma-delimited list of URLs.
func parseURLs(s string) (a []*url.URL) {
func parseURLs(s string) (a []url.URL) {
if s == "" {
return nil
}
@ -364,7 +376,7 @@ func parseURLs(s string) (a []*url.URL) {
if err != nil {
log.Fatalf("cannot parse urls: %s", err)
}
a = append(a, u)
a = append(a, *u)
}
return
}

View File

@ -383,12 +383,6 @@ func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request) {
// Retrieve data node reference.
node := h.server.DataNodeByURL(u)
// Create a new replica on the broker.
if err := h.server.Client().CreateReplica(node.ID, node.URL); err != nil {
httpError(w, err.Error(), false, http.StatusBadGateway)
return
}
// Write new node back to client.
w.WriteHeader(http.StatusCreated)
w.Header().Add("content-type", "application/json")

View File

@ -11,14 +11,13 @@ import (
"net/url"
"os"
"strings"
"sync"
"testing"
"time"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/httpd"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/messaging"
"github.com/influxdb/influxdb/test"
)
func init() {
@ -137,7 +136,9 @@ func TestBatchWrite_UnmarshalRFC(t *testing.T) {
}
func TestHandler_Databases(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
srvr.CreateDatabase("bar")
s := NewHTTPServer(srvr)
@ -152,7 +153,9 @@ func TestHandler_Databases(t *testing.T) {
}
func TestHandler_DatabasesPrettyPrinted(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
srvr.CreateDatabase("bar")
s := NewHTTPServer(srvr)
@ -187,7 +190,9 @@ func TestHandler_DatabasesPrettyPrinted(t *testing.T) {
}
func TestHandler_CreateDatabase(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -200,7 +205,9 @@ func TestHandler_CreateDatabase(t *testing.T) {
}
func TestHandler_CreateDatabase_BadRequest_NoName(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -211,7 +218,9 @@ func TestHandler_CreateDatabase_BadRequest_NoName(t *testing.T) {
}
func TestHandler_CreateDatabase_Conflict(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
s := NewHTTPServer(srvr)
defer s.Close()
@ -225,7 +234,9 @@ func TestHandler_CreateDatabase_Conflict(t *testing.T) {
}
func TestHandler_DropDatabase(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
s := NewHTTPServer(srvr)
defer s.Close()
@ -239,7 +250,9 @@ func TestHandler_DropDatabase(t *testing.T) {
}
func TestHandler_DropDatabase_NotFound(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -252,7 +265,9 @@ func TestHandler_DropDatabase_NotFound(t *testing.T) {
}
func TestHandler_RetentionPolicies(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
s := NewHTTPServer(srvr)
@ -268,7 +283,9 @@ func TestHandler_RetentionPolicies(t *testing.T) {
}
func TestHandler_RetentionPolicies_DatabaseNotFound(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -282,7 +299,9 @@ func TestHandler_RetentionPolicies_DatabaseNotFound(t *testing.T) {
}
func TestHandler_CreateRetentionPolicy(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
s := NewHTTPServer(srvr)
defer s.Close()
@ -298,7 +317,9 @@ func TestHandler_CreateRetentionPolicy(t *testing.T) {
}
func TestHandler_CreateRetentionPolicyAsDefault(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
s := NewHTTPServer(srvr)
defer s.Close()
@ -321,7 +342,9 @@ func TestHandler_CreateRetentionPolicyAsDefault(t *testing.T) {
}
func TestHandler_CreateRetentionPolicy_DatabaseNotFound(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -334,7 +357,9 @@ func TestHandler_CreateRetentionPolicy_DatabaseNotFound(t *testing.T) {
}
func TestHandler_CreateRetentionPolicy_Conflict(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
s := NewHTTPServer(srvr)
defer s.Close()
@ -350,7 +375,9 @@ func TestHandler_CreateRetentionPolicy_Conflict(t *testing.T) {
}
func TestHandler_CreateRetentionPolicy_BadRequest(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
s := NewHTTPServer(srvr)
defer s.Close()
@ -364,7 +391,9 @@ func TestHandler_CreateRetentionPolicy_BadRequest(t *testing.T) {
}
func TestHandler_UpdateRetentionPolicy(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
s := NewHTTPServer(srvr)
@ -394,7 +423,9 @@ func TestHandler_UpdateRetentionPolicy(t *testing.T) {
}
func TestHandler_UpdateRetentionPolicy_BadRequest(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
s := NewHTTPServer(srvr)
@ -410,7 +441,9 @@ func TestHandler_UpdateRetentionPolicy_BadRequest(t *testing.T) {
}
func TestHandler_UpdateRetentionPolicy_DatabaseNotFound(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -424,7 +457,9 @@ func TestHandler_UpdateRetentionPolicy_DatabaseNotFound(t *testing.T) {
}
func TestHandler_UpdateRetentionPolicy_NotFound(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
s := NewHTTPServer(srvr)
@ -440,7 +475,9 @@ func TestHandler_UpdateRetentionPolicy_NotFound(t *testing.T) {
}
func TestHandler_DeleteRetentionPolicy(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
s := NewHTTPServer(srvr)
@ -457,7 +494,9 @@ func TestHandler_DeleteRetentionPolicy(t *testing.T) {
}
func TestHandler_DeleteRetentionPolicy_DatabaseNotFound(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -472,7 +511,9 @@ func TestHandler_DeleteRetentionPolicy_DatabaseNotFound(t *testing.T) {
}
func TestHandler_DeleteRetentionPolicy_NotFound(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
s := NewHTTPServer(srvr)
defer s.Close()
@ -488,7 +529,9 @@ func TestHandler_DeleteRetentionPolicy_NotFound(t *testing.T) {
}
func TestHandler_GzipEnabled(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -512,7 +555,9 @@ func TestHandler_GzipEnabled(t *testing.T) {
}
func TestHandler_GzipDisabled(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -536,7 +581,9 @@ func TestHandler_GzipDisabled(t *testing.T) {
}
func TestHandler_Index(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -552,7 +599,9 @@ func TestHandler_Index(t *testing.T) {
}
func TestHandler_Wait(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -568,7 +617,9 @@ func TestHandler_Wait(t *testing.T) {
}
func TestHandler_WaitIncrement(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
@ -586,7 +637,9 @@ func TestHandler_WaitIncrement(t *testing.T) {
}
func TestHandler_WaitNoIndexSpecified(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -598,7 +651,9 @@ func TestHandler_WaitNoIndexSpecified(t *testing.T) {
}
func TestHandler_WaitInvalidIndexSpecified(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -610,7 +665,9 @@ func TestHandler_WaitInvalidIndexSpecified(t *testing.T) {
}
func TestHandler_WaitExpectTimeout(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -622,7 +679,9 @@ func TestHandler_WaitExpectTimeout(t *testing.T) {
}
func TestHandler_Ping(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -634,7 +693,9 @@ func TestHandler_Ping(t *testing.T) {
}
func TestHandler_PingHead(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -646,7 +707,9 @@ func TestHandler_PingHead(t *testing.T) {
}
func TestHandler_Users_MultipleUsers(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateUser("jdoe", "1337", false)
srvr.CreateUser("mclark", "1337", true)
srvr.CreateUser("csmith", "1337", false)
@ -664,7 +727,9 @@ func TestHandler_Users_MultipleUsers(t *testing.T) {
func TestHandler_UpdateUser(t *testing.T) {
t.Skip()
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateUser("jdoe", "1337", false)
s := NewHTTPServer(srvr)
defer s.Close()
@ -685,7 +750,9 @@ func TestHandler_UpdateUser(t *testing.T) {
func TestHandler_UpdateUser_PasswordBadRequest(t *testing.T) {
t.Skip()
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateUser("jdoe", "1337", false)
s := NewHTTPServer(srvr)
defer s.Close()
@ -700,7 +767,9 @@ func TestHandler_UpdateUser_PasswordBadRequest(t *testing.T) {
func TestHandler_DataNodes(t *testing.T) {
t.Skip()
srvr := OpenUninitializedServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenUninitializedServer(c)
srvr.CreateDataNode(MustParseURL("http://localhost:1000"))
srvr.CreateDataNode(MustParseURL("http://localhost:2000"))
srvr.CreateDataNode(MustParseURL("http://localhost:3000"))
@ -717,7 +786,9 @@ func TestHandler_DataNodes(t *testing.T) {
func TestHandler_CreateDataNode(t *testing.T) {
t.Skip()
srvr := OpenUninitializedServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenUninitializedServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -731,7 +802,9 @@ func TestHandler_CreateDataNode(t *testing.T) {
func TestHandler_CreateDataNode_BadRequest(t *testing.T) {
t.Skip()
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -745,7 +818,9 @@ func TestHandler_CreateDataNode_BadRequest(t *testing.T) {
func TestHandler_CreateDataNode_InternalServerError(t *testing.T) {
t.Skip()
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -759,7 +834,9 @@ func TestHandler_CreateDataNode_InternalServerError(t *testing.T) {
func TestHandler_DeleteDataNode(t *testing.T) {
t.Skip()
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDataNode(MustParseURL("http://localhost:1000"))
s := NewHTTPServer(srvr)
defer s.Close()
@ -774,7 +851,9 @@ func TestHandler_DeleteDataNode(t *testing.T) {
func TestHandler_DeleteUser_DataNodeNotFound(t *testing.T) {
t.Skip()
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -789,7 +868,9 @@ func TestHandler_DeleteUser_DataNodeNotFound(t *testing.T) {
// Perform a subset of endpoint testing, with authentication enabled.
func TestHandler_AuthenticatedCreateAdminUser(t *testing.T) {
srvr := OpenAuthenticatedServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
s := NewAuthenticatedHTTPServer(srvr)
defer s.Close()
@ -810,7 +891,9 @@ func TestHandler_AuthenticatedCreateAdminUser(t *testing.T) {
}
func TestHandler_AuthenticatedDatabases_Unauthorized(t *testing.T) {
srvr := OpenAuthenticatedServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
s := NewAuthenticatedHTTPServer(srvr)
defer s.Close()
@ -821,7 +904,9 @@ func TestHandler_AuthenticatedDatabases_Unauthorized(t *testing.T) {
}
func TestHandler_AuthenticatedDatabases_AuthorizedQueryParams(t *testing.T) {
srvr := OpenAuthenticatedServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateUser("lisa", "password", true)
s := NewAuthenticatedHTTPServer(srvr)
defer s.Close()
@ -834,7 +919,9 @@ func TestHandler_AuthenticatedDatabases_AuthorizedQueryParams(t *testing.T) {
}
func TestHandler_AuthenticatedDatabases_UnauthorizedQueryParams(t *testing.T) {
srvr := OpenAuthenticatedServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateUser("lisa", "password", true)
s := NewAuthenticatedHTTPServer(srvr)
defer s.Close()
@ -847,7 +934,9 @@ func TestHandler_AuthenticatedDatabases_UnauthorizedQueryParams(t *testing.T) {
}
func TestHandler_AuthenticatedDatabases_AuthorizedBasicAuth(t *testing.T) {
srvr := OpenAuthenticatedServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateUser("lisa", "password", true)
s := NewAuthenticatedHTTPServer(srvr)
defer s.Close()
@ -862,7 +951,9 @@ func TestHandler_AuthenticatedDatabases_AuthorizedBasicAuth(t *testing.T) {
}
func TestHandler_AuthenticatedDatabases_UnauthorizedBasicAuth(t *testing.T) {
srvr := OpenAuthenticatedServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateUser("lisa", "password", true)
s := NewAuthenticatedHTTPServer(srvr)
defer s.Close()
@ -877,7 +968,9 @@ func TestHandler_AuthenticatedDatabases_UnauthorizedBasicAuth(t *testing.T) {
}
func TestHandler_GrantDBPrivilege(t *testing.T) {
srvr := OpenAuthenticatedServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
// Create a cluster admin that will grant privilege to "john".
srvr.CreateUser("lisa", "password", true)
// Create user that will be granted a privilege.
@ -914,7 +1007,9 @@ func TestHandler_GrantDBPrivilege(t *testing.T) {
}
func TestHandler_RevokeAdmin(t *testing.T) {
srvr := OpenAuthenticatedServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
// Create a cluster admin that will revoke admin from "john".
srvr.CreateUser("lisa", "password", true)
// Create user that will have cluster admin revoked.
@ -946,7 +1041,9 @@ func TestHandler_RevokeAdmin(t *testing.T) {
}
func TestHandler_RevokeDBPrivilege(t *testing.T) {
srvr := OpenAuthenticatedServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
// Create a cluster admin that will revoke privilege from "john".
srvr.CreateUser("lisa", "password", true)
// Create user that will have privilege revoked.
@ -980,7 +1077,9 @@ func TestHandler_RevokeDBPrivilege(t *testing.T) {
}
func TestHandler_DropSeries(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
s := NewHTTPServer(srvr)
@ -1001,7 +1100,9 @@ func TestHandler_DropSeries(t *testing.T) {
}
func TestHandler_serveWriteSeries(t *testing.T) {
srvr := OpenAuthenticatedServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
s := NewHTTPServer(srvr)
@ -1015,7 +1116,9 @@ func TestHandler_serveWriteSeries(t *testing.T) {
}
func TestHandler_serveWriteSeriesWithNoFields(t *testing.T) {
srvr := OpenAuthenticatedServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
s := NewHTTPServer(srvr)
@ -1033,7 +1136,9 @@ func TestHandler_serveWriteSeriesWithNoFields(t *testing.T) {
}
func TestHandler_serveWriteSeriesWithAuthNilUser(t *testing.T) {
srvr := OpenAuthenticatedServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
s := NewAuthenticatedHTTPServer(srvr)
@ -1052,7 +1157,9 @@ func TestHandler_serveWriteSeriesWithAuthNilUser(t *testing.T) {
}
func TestHandler_serveWriteSeries_noDatabaseExists(t *testing.T) {
srvr := OpenAuthenticatedServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -1070,7 +1177,9 @@ func TestHandler_serveWriteSeries_noDatabaseExists(t *testing.T) {
}
func TestHandler_serveWriteSeries_invalidJSON(t *testing.T) {
srvr := OpenAuthenticatedServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -1087,7 +1196,9 @@ func TestHandler_serveWriteSeries_invalidJSON(t *testing.T) {
}
func TestHandler_serveWriteSeries_noDatabaseSpecified(t *testing.T) {
srvr := OpenAuthenticatedServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
s := NewHTTPServer(srvr)
defer s.Close()
@ -1104,7 +1215,9 @@ func TestHandler_serveWriteSeries_noDatabaseSpecified(t *testing.T) {
}
func TestHandler_serveWriteSeriesNonZeroTime(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
srvr.SetDefaultRetentionPolicy("foo", "bar")
@ -1145,7 +1258,9 @@ func TestHandler_serveWriteSeriesNonZeroTime(t *testing.T) {
}
func TestHandler_serveWriteSeriesZeroTime(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
srvr.SetDefaultRetentionPolicy("foo", "bar")
@ -1198,7 +1313,9 @@ func TestHandler_serveWriteSeriesZeroTime(t *testing.T) {
}
func TestHandler_serveWriteSeriesBatch(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
srvr.SetDefaultRetentionPolicy("foo", "bar")
@ -1281,7 +1398,9 @@ func TestHandler_serveWriteSeriesBatch(t *testing.T) {
}
func TestHandler_serveWriteSeriesFieldTypeConflict(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
srvr.SetDefaultRetentionPolicy("foo", "bar")
@ -1322,7 +1441,9 @@ func str2iface(strs []string) []interface{} {
}
func TestHandler_ProcessContinousQueries(t *testing.T) {
srvr := OpenAuthenticatedServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
s := NewAuthenticatedHTTPServer(srvr)
defer s.Close()
@ -1408,7 +1529,7 @@ func NewServer() *Server {
// OpenAuthenticatedServer returns a new, open test server instance with authentication enabled.
func OpenAuthenticatedServer(client influxdb.MessagingClient) *Server {
s := OpenUninitializedServer(client)
if err := s.Initialize(&url.URL{Host: "127.0.0.1:8080"}); err != nil {
if err := s.Initialize(url.URL{Host: "127.0.0.1:8080"}); err != nil {
panic(err.Error())
}
return s
@ -1457,72 +1578,6 @@ func OpenUninitializedServer(client influxdb.MessagingClient) *Server {
return s
}
// TODO corylanou: evaluate how much of this should be in this package
// vs. how much should be a mocked out interface
// MessagingClient represents a test client for the messaging broker.
type MessagingClient struct {
index uint64
c chan *messaging.Message
mu sync.Mutex // Ensure all publishing is serialized.
PublishFunc func(*messaging.Message) (uint64, error)
CreateReplicaFunc func(replicaID uint64, connectURL *url.URL) error
DeleteReplicaFunc func(replicaID uint64) error
SubscribeFunc func(replicaID, topicID uint64) error
UnsubscribeFunc func(replicaID, topicID uint64) error
}
// NewMessagingClient returns a new instance of MessagingClient.
func NewMessagingClient() *MessagingClient {
c := &MessagingClient{c: make(chan *messaging.Message, 1)}
c.PublishFunc = c.send
c.CreateReplicaFunc = func(replicaID uint64, connectURL *url.URL) error { return nil }
c.DeleteReplicaFunc = func(replicaID uint64) error { return nil }
c.SubscribeFunc = func(replicaID, topicID uint64) error { return nil }
c.UnsubscribeFunc = func(replicaID, topicID uint64) error { return nil }
return c
}
// Publish attaches an autoincrementing index to the message.
// This function also execute's the client's PublishFunc mock function.
func (c *MessagingClient) Publish(m *messaging.Message) (uint64, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.index++
m.Index = c.index
return c.PublishFunc(m)
}
// send sends the message through to the channel.
// This is the default value of PublishFunc.
func (c *MessagingClient) send(m *messaging.Message) (uint64, error) {
c.c <- m
return m.Index, nil
}
// Creates a new replica with a given ID on the broker.
func (c *MessagingClient) CreateReplica(replicaID uint64, connectURL *url.URL) error {
return c.CreateReplicaFunc(replicaID, connectURL)
}
// Deletes an existing replica with a given ID from the broker.
func (c *MessagingClient) DeleteReplica(replicaID uint64) error {
return c.DeleteReplicaFunc(replicaID)
}
// Subscribe adds a subscription to a replica for a topic on the broker.
func (c *MessagingClient) Subscribe(replicaID, topicID uint64) error {
return c.SubscribeFunc(replicaID, topicID)
}
// Unsubscribe removes a subscrition from a replica for a topic on the broker.
func (c *MessagingClient) Unsubscribe(replicaID, topicID uint64) error {
return c.UnsubscribeFunc(replicaID, topicID)
}
// C returns a channel for streaming message.
func (c *MessagingClient) C() <-chan *messaging.Message { return c.c }
// tempfile returns a temporary path.
func tempfile() string {
f, _ := ioutil.TempFile("", "influxdb-")

View File

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"log"
"net/url"
"os"
"path/filepath"
"sort"
@ -34,6 +35,9 @@ type Broker struct {
// Log is the distributed raft log that commands are applied to.
Log interface {
URL() url.URL
Leader() (uint64, url.URL)
ClusterID() uint64
Apply(data []byte) (index uint64, err error)
}
@ -61,6 +65,18 @@ func (b *Broker) metaPath() string {
return filepath.Join(b.path, "meta")
}
// URL returns the URL of the broker.
func (b *Broker) URL() url.URL { return b.Log.URL() }
// LeaderURL returns the URL to the leader broker.
func (b *Broker) LeaderURL() url.URL {
_, u := b.Log.Leader()
return u
}
// ClusterID returns the identifier for the cluster.
func (b *Broker) ClusterID() uint64 { return b.Log.ClusterID() }
// TopicPath returns the file path to a topic's data.
// Returns a blank string if the broker is closed.
func (b *Broker) TopicPath(id uint64) string {
@ -86,10 +102,10 @@ func (b *Broker) Topic(id uint64) *Topic {
// Index returns the highest index seen by the broker across all topics.
// Returns 0 if the broker is closed.
func (b *Broker) Index() uint64 {
func (b *Broker) Index() (uint64, error) {
b.mu.RLock()
defer b.mu.RUnlock()
return b.index
return b.index, nil
}
// opened returns true if the broker is in an open and running state.
@ -268,7 +284,7 @@ func (b *Broker) createSnapshotHeader() (*snapshotHeader, error) {
// Read segments from disk, not topic.
segments, err := ReadSegments(t.path)
if err != nil {
if err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("read segments: %s", err)
}
@ -401,7 +417,7 @@ func (b *Broker) Publish(m *Message) (uint64, error) {
}
// TopicReader returns a new topic reader for a topic starting from a given index.
func (b *Broker) TopicReader(topicID, index uint64, streaming bool) *TopicReader {
func (b *Broker) TopicReader(topicID, index uint64, streaming bool) io.ReadCloser {
return NewTopicReader(b.TopicPath(topicID), index, streaming)
}
@ -617,7 +633,7 @@ func (t *Topic) Open() error {
// Read available segments.
segments, err := ReadSegments(t.path)
if err != nil {
if err != nil && !os.IsNotExist(err) {
t.close()
return fmt.Errorf("read segments: %s", err)
}
@ -669,7 +685,7 @@ func (t *Topic) close() error {
func (t *Topic) ReadIndex() (uint64, error) {
// Read a list of all segments.
segments, err := ReadSegments(t.path)
if err != nil {
if err != nil && !os.IsNotExist(err) {
return 0, fmt.Errorf("read segments: %s", err)
}
@ -839,7 +855,9 @@ func ReadSegments(path string) (Segments, error) {
func ReadSegmentByIndex(path string, index uint64) (*Segment, error) {
// Find a list of all segments.
segments, err := ReadSegments(path)
if err != nil {
if os.IsNotExist(err) {
return nil, err
} else if err != nil {
return nil, fmt.Errorf("read segments: %s", err)
}
@ -869,7 +887,9 @@ func ReadSegmentByIndex(path string, index uint64) (*Segment, error) {
func ReadSegmentMaxIndex(path string) (uint64, error) {
// Open segment file.
f, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
return 0, err
} else if err != nil {
return 0, fmt.Errorf("open: %s", err)
}
defer func() { _ = f.Close() }()
@ -966,9 +986,11 @@ func (r *TopicReader) File() (*os.File, error) {
// If the first file hasn't been opened then open it and seek.
if r.file == nil {
// Find the segment containing the index.
// Exit if no segments are available.
// Exit if no segments are available or if path not found.
segment, err := ReadSegmentByIndex(r.path, r.index)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
} else if err != nil {
return nil, fmt.Errorf("segment by index: %s", err)
} else if segment == nil {
return nil, nil
@ -1032,7 +1054,9 @@ func (r *TopicReader) nextSegment() error {
// If no segments exist then exit.
// If current segment is the last segment then ignore.
segments, err := ReadSegments(r.path)
if err != nil {
if os.IsNotExist(err) {
return nil
} else if err != nil {
return fmt.Errorf("read segments: %s", err)
} else if len(segments) == 0 {
return nil

View File

@ -6,6 +6,7 @@ import (
"io"
"io/ioutil"
"math/rand"
"net/url"
"os"
"path/filepath"
"reflect"
@ -18,6 +19,11 @@ import (
"github.com/influxdb/influxdb/raft"
)
func init() {
// Ensure the broker matches the handler's interface.
_ = messaging.Handler{Broker: messaging.NewBroker()}
}
// Ensure that opening a broker without a path returns an error.
func TestBroker_Open_ErrPathRequired(t *testing.T) {
b := messaging.NewBroker()
@ -97,7 +103,7 @@ func TestBroker_Apply(t *testing.T) {
}
// Verify broker high water mark.
if index := b.Index(); index != 4 {
if index, _ := b.Index(); index != 4 {
t.Fatalf("unexpected broker index: %d", index)
}
}
@ -152,7 +158,7 @@ func TestBroker_Reopen(t *testing.T) {
}
// Verify broker high water mark.
if index := b.Index(); index != 4 {
if index, _ := b.Index(); index != 4 {
t.Fatalf("unexpected broker index: %d", index)
}
@ -218,7 +224,7 @@ func TestBroker_Snapshot(t *testing.T) {
}
// Verify broker high water mark.
if index := b1.Index(); index != 4 {
if index, _ := b1.Index(); index != 4 {
t.Fatalf("unexpected broker index: %d", index)
}
}
@ -710,10 +716,16 @@ func (b *Broker) MustReadAllTopic(topicID uint64) (a []*messaging.Message) {
// BrokerLog is a mockable object that implements Broker.Log.
type BrokerLog struct {
ApplyFunc func(data []byte) (uint64, error)
ApplyFunc func(data []byte) (uint64, error)
ClusterIDFunc func() uint64
LeaderFunc func() (uint64, url.URL)
URLFunc func() url.URL
}
func (l *BrokerLog) Apply(data []byte) (uint64, error) { return l.ApplyFunc(data) }
func (l *BrokerLog) ClusterID() uint64 { return l.ClusterIDFunc() }
func (l *BrokerLog) Leader() (uint64, url.URL) { return l.LeaderFunc() }
func (l *BrokerLog) URL() url.URL { return l.URLFunc() }
// Messages represents a collection of messages.
// This type provides helper functions.

View File

@ -252,10 +252,11 @@ func NewClientConfig(u []url.URL) *ClientConfig {
// Conn represents a stream over the client for a single topic.
type Conn struct {
mu sync.Mutex
topicID uint64 // topic identifier
index uint64 // highest index sent over the channel
url url.URL // current broker url
mu sync.Mutex
topicID uint64 // topic identifier
index uint64 // highest index sent over the channel
streaming bool // use streaming reader, if true
url url.URL // current broker url
opened bool
c chan *Message // channel streams messages from the broker.
@ -299,6 +300,13 @@ func (c *Conn) SetIndex(index uint64) {
c.index = index
}
// Streaming returns true if the connection streams messages continuously.
func (c *Conn) Streaming() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.streaming
}
// URL returns the current URL of the connection.
func (c *Conn) URL() url.URL {
c.mu.Lock()
@ -314,7 +322,7 @@ func (c *Conn) SetURL(u url.URL) {
}
// Open opens a streaming connection to the broker.
func (c *Conn) Open(index uint64) error {
func (c *Conn) Open(index uint64, streaming bool) error {
c.mu.Lock()
defer c.mu.Unlock()
@ -328,6 +336,7 @@ func (c *Conn) Open(index uint64) error {
// Set starting index.
c.index = index
c.streaming = streaming
// Create streaming channel.
c.c = make(chan *Message, 0)
@ -430,8 +439,9 @@ func (c *Conn) streamer(closing <-chan struct{}) {
u := c.URL()
u.Path = "/messaging/messages"
u.RawQuery = url.Values{
"topicID": {strconv.FormatUint(c.topicID, 10)},
"index": {strconv.FormatUint(c.Index(), 10)},
"topicID": {strconv.FormatUint(c.topicID, 10)},
"index": {strconv.FormatUint(c.Index(), 10)},
"streaming": {strconv.FormatBool(c.Streaming())},
}.Encode()
// Create request.
@ -479,6 +489,7 @@ func (c *Conn) stream(req *http.Request, closing <-chan struct{}) error {
// Decode message from the stream.
m := &Message{}
if err := dec.Decode(m); err == io.EOF {
warn("EOF!!!")
return nil
} else if err != nil {
return fmt.Errorf("decode: %s", err)

View File

@ -31,7 +31,7 @@ func TestClient_Conn(t *testing.T) {
// Connect on topic #1.
conn1 := c.Conn(1)
if err := conn1.Open(0); err != nil {
if err := conn1.Open(0, false); err != nil {
t.Fatal(err)
} else if m := <-conn1.C(); !reflect.DeepEqual(m, &messaging.Message{Index: 1, Data: []byte{100}}) {
t.Fatalf("unexpected message(1): %#v", m)
@ -39,7 +39,7 @@ func TestClient_Conn(t *testing.T) {
// Connect on topic #2.
conn2 := c.Conn(2)
if err := conn2.Open(0); err != nil {
if err := conn2.Open(0, false); err != nil {
t.Fatal(err)
} else if m := <-conn2.C(); !reflect.DeepEqual(m, &messaging.Message{Index: 2, Data: []byte{200}}) {
t.Fatalf("unexpected message(2): %#v", m)
@ -54,9 +54,9 @@ func TestClient_Conn(t *testing.T) {
// Ensure that an error is returned when opening an opened connection.
func TestConn_Open_ErrConnOpen(t *testing.T) {
c := messaging.NewConn(1)
c.Open(0)
c.Open(0, false)
defer c.Close()
if err := c.Open(0); err != messaging.ErrConnOpen {
if err := c.Open(0, false); err != messaging.ErrConnOpen {
t.Fatalf("unexpected error: %s", err)
}
}
@ -64,9 +64,9 @@ func TestConn_Open_ErrConnOpen(t *testing.T) {
// Ensure that an error is returned when opening a previously closed connection.
func TestConn_Open_ErrConnCannotReuse(t *testing.T) {
c := messaging.NewConn(1)
c.Open(0)
c.Open(0, false)
c.Close()
if err := c.Open(0); err != messaging.ErrConnCannotReuse {
if err := c.Open(0, false); err != messaging.ErrConnCannotReuse {
t.Fatalf("unexpected error: %s", err)
}
}
@ -74,7 +74,7 @@ func TestConn_Open_ErrConnCannotReuse(t *testing.T) {
// Ensure that an error is returned when closing a closed connection.
func TestConn_Close_ErrConnClosed(t *testing.T) {
c := messaging.NewConn(1)
c.Open(0)
c.Open(0, false)
c.Close()
if err := c.Close(); err != messaging.ErrConnClosed {
t.Fatalf("unexpected error: %s", err)
@ -102,7 +102,7 @@ func TestConn_Open(t *testing.T) {
// Create and open connection to server.
c := messaging.NewConn(100)
c.SetURL(*MustParseURL(s.URL))
if err := c.Open(200); err != nil {
if err := c.Open(200, false); err != nil {
t.Fatal(err)
}
@ -139,7 +139,7 @@ func TestConn_Open_Reconnect(t *testing.T) {
// Create and open connection to server.
c := messaging.NewConn(100)
c.SetURL(*MustParseURL(s.URL))
if err := c.Open(0); err != nil {
if err := c.Open(0, false); err != nil {
t.Fatal(err)
}
@ -179,124 +179,6 @@ func TestConn_Heartbeat(t *testing.T) {
}
}
/*
// Ensure that a client can open a connect to the broker.
func TestClient_Open(t *testing.T) {
c := NewClient()
defer c.Close()
// Create replica on broker.
c.Server.Handler.Broker().PublishSync()
// Open client to broker.
f := NewTempFile()
defer os.Remove(f)
u, _ := url.Parse(c.Server.URL)
if err := c.Open(f, []*url.URL{u}); err != nil {
t.Fatalf("unexpected error: %s", err)
}
// Receive messages from the stream.
if m := <-c.C(); m.Type != messaging.InternalMessageType {
t.Fatalf("message type mismatch(internal): %x", m.Type)
} else if m = <-c.C(); m.Type != messaging.CreateReplicaMessageType {
t.Fatalf("message type mismatch(create replica): %x", m.Type)
}
// Close connection to the broker.
if err := c.Client.Close(); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that opening an already open client returns an error.
func TestClient_Open_ErrClientOpen(t *testing.T) {
c := NewClient(1000)
defer c.Close()
// Open client to broker.
f := NewTempFile()
defer os.Remove(f)
u, _ := url.Parse(c.Server.URL)
c.Open(f, []*url.URL{u})
if err := c.Open(f, []*url.URL{u}); err != messaging.ErrClientOpen {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that opening a client without a broker URL returns an error.
func TestClient_Open_ErrBrokerURLRequired(t *testing.T) {
t.Skip()
c := NewClient(1000)
defer c.Close()
f := NewTempFile()
defer os.Remove(f)
if err := c.Open(f, []*url.URL{}); err != messaging.ErrBrokerURLRequired {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that a client can close while a message is pending.
func TestClient_Close(t *testing.T) {
c := NewClient(1000)
defer c.Close()
// Create replica on broker.
c.Server.Handler.Broker().CreateReplica(1000, &url.URL{Host: "localhost"})
// Open client to broker.
f := NewTempFile()
defer os.Remove(f)
u, _ := url.Parse(c.Server.URL)
if err := c.Open(f, []*url.URL{u}); err != nil {
t.Fatalf("unexpected error: %s", err)
}
time.Sleep(10 * time.Millisecond)
// Close connection to the broker.
if err := c.Client.Close(); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that a client can publish messages to the broker.
func TestClient_Publish(t *testing.T) {
c := OpenClient(1000)
defer c.Close()
// Publish message to the broker.
if index, err := c.Publish(&messaging.Message{Type: 100, TopicID: messaging.BroadcastTopicID, Data: []byte{0}}); err != nil {
t.Fatalf("unexpected error: %v", err)
} else if index != 3 {
t.Fatalf("unexpected index: %d", index)
}
}
// Ensure that a client receives an error when publishing to a stopped server.
func TestClient_Publish_ErrConnectionRefused(t *testing.T) {
c := OpenClient(1000)
c.Server.Close()
defer c.Close()
// Publish message to the broker.
if _, err := c.Publish(&messaging.Message{Type: 100, TopicID: 0, Data: []byte{0}}); err == nil || !strings.Contains(err.Error(), "connection refused") {
t.Fatalf("unexpected error: %v", err)
}
}
// Ensure that a client receives an error when publishing to a closed broker.
func TestClient_Publish_ErrLogClosed(t *testing.T) {
c := OpenClient(1000)
c.Server.Handler.Broker().Close()
defer c.Close()
// Publish message to the broker.
if _, err := c.Publish(&messaging.Message{Type: 100, TopicID: 0, Data: []byte{0}}); err == nil || err.Error() != "log closed" {
t.Fatalf("unexpected error: %v", err)
}
}
*/
// Client represents a test wrapper for the broker client.
type Client struct {
*messaging.Client

View File

@ -3,6 +3,7 @@ package messaging
import (
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"strconv"
@ -14,7 +15,7 @@ import (
// Handler represents an HTTP handler by the broker.
type Handler struct {
Broker interface {
LeaderURL() *url.URL
LeaderURL() url.URL
TopicReader(topicID, index uint64, streaming bool) io.ReadCloser
Publish(m *Message) (uint64, error)
SetTopicMaxIndex(topicID, index uint64) error
@ -76,16 +77,24 @@ func (h *Handler) getMessages(w http.ResponseWriter, req *http.Request) {
defer r.Close()
// Ensure we close the topic reader if the connection is disconnected.
done := make(chan struct{}, 0)
defer close(done)
if w, ok := w.(http.CloseNotifier); ok {
go func() {
select {
case <-w.CloseNotify():
_ = r.Close()
case <-done:
return
}
}()
}
// Write out all data from the topic reader.
io.Copy(w, r)
// Automatically flush as reads come in.
if _, err := CopyFlush(w, r); err != nil {
log.Printf("message stream error: %s", err)
}
}
// postMessages publishes a message to the broker.
@ -162,7 +171,7 @@ func (h *Handler) error(w http.ResponseWriter, err error, code int) {
// redirects to the current known leader.
// If no leader is found then returns a 500.
func (h *Handler) redirectToLeader(w http.ResponseWriter, r *http.Request) {
if u := h.Broker.LeaderURL(); u != nil {
if u := h.Broker.LeaderURL(); u.Host != "" {
redirectURL := *r.URL
redirectURL.Scheme = u.Scheme
redirectURL.Host = u.Host
@ -172,3 +181,39 @@ func (h *Handler) redirectToLeader(w http.ResponseWriter, r *http.Request) {
h.error(w, raft.ErrNotLeader, http.StatusInternalServerError)
}
// CopyFlush copies from src to dst until EOF or an error occurs.
// Each write is proceeded by a flush, if the writer implements http.Flusher.
//
// This implementation is copied from io.Copy().
func CopyFlush(dst io.Writer, src io.Reader) (written int64, err error) {
buf := make([]byte, 32*1024)
for {
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if nw > 0 {
written += int64(nw)
}
// Flush after write.
if dst, ok := dst.(http.Flusher); ok {
dst.Flush()
}
if ew != nil {
err = ew
break
} else if nr != nw {
err = io.ErrShortWrite
break
}
} else if er == io.EOF {
break
} else if er != nil {
err = er
break
}
}
return written, err
}

View File

@ -224,13 +224,13 @@ func TestHandler_ErrNotFound(t *testing.T) {
// HandlerBroker is a mockable type that implements Handler.Broker.
type HandlerBroker struct {
LeaderURLFunc func() *url.URL
LeaderURLFunc func() url.URL
PublishFunc func(m *messaging.Message) (uint64, error)
TopicReaderFunc func(topicID, index uint64, streaming bool) io.ReadCloser
SetTopicMaxIndexFunc func(topicID, index uint64) error
}
func (b *HandlerBroker) LeaderURL() *url.URL { return b.LeaderURLFunc() }
func (b *HandlerBroker) LeaderURL() url.URL { return b.LeaderURLFunc() }
func (b *HandlerBroker) Publish(m *messaging.Message) (uint64, error) { return b.PublishFunc(m) }
func (b *HandlerBroker) TopicReader(topicID, index uint64, streaming bool) io.ReadCloser {
return b.TopicReaderFunc(topicID, index, streaming)

View File

@ -33,7 +33,7 @@ func (c *Config) NodeByID(id uint64) *ConfigNode {
}
// NodeByURL returns a node by URL.
func (c *Config) NodeByURL(u *url.URL) *ConfigNode {
func (c *Config) NodeByURL(u url.URL) *ConfigNode {
for _, n := range c.Nodes {
if n.URL.String() == u.String() {
return n
@ -43,11 +43,11 @@ func (c *Config) NodeByURL(u *url.URL) *ConfigNode {
}
// AddNode adds a new node to the config.
func (c *Config) AddNode(id uint64, u *url.URL) error {
func (c *Config) AddNode(id uint64, u url.URL) error {
// Validate that the id is non-zero and the url exists.
if id == 0 {
return ErrInvalidNodeID
} else if u == nil {
} else if u.Host == "" {
return ErrNodeURLRequired
}
@ -97,13 +97,13 @@ func (c *Config) Clone() *Config {
// ConfigNode represents a single machine in the raft configuration.
type ConfigNode struct {
ID uint64
URL *url.URL
URL url.URL
}
// clone returns a deep copy of the node.
func (n *ConfigNode) clone() *ConfigNode {
other := &ConfigNode{ID: n.ID, URL: &url.URL{}}
*other.URL = *n.URL
other := &ConfigNode{ID: n.ID}
other.URL = n.URL
return other
}
@ -162,11 +162,11 @@ func (dec *ConfigDecoder) Decode(c *Config) error {
if err != nil {
return err
} else if n.URL == "" {
u = nil
u = &url.URL{}
}
// Append node to config.
if err := c.AddNode(n.ID, u); err != nil {
if err := c.AddNode(n.ID, *u); err != nil {
return err
}
}

View File

@ -14,8 +14,8 @@ import (
func TestConfig_NodeByID(t *testing.T) {
c := &raft.Config{
Nodes: []*raft.ConfigNode{
{ID: 1, URL: &url.URL{Host: "localhost:8000"}},
{ID: 2, URL: &url.URL{Host: "localhost:9000"}},
{ID: 1, URL: url.URL{Host: "localhost:8000"}},
{ID: 2, URL: url.URL{Host: "localhost:9000"}},
},
}
@ -34,18 +34,18 @@ func TestConfig_NodeByID(t *testing.T) {
func TestConfig_NodeByURL(t *testing.T) {
c := &raft.Config{
Nodes: []*raft.ConfigNode{
{ID: 1, URL: &url.URL{Host: "localhost:8000"}},
{ID: 2, URL: &url.URL{Host: "localhost:9000"}},
{ID: 1, URL: url.URL{Host: "localhost:8000"}},
{ID: 2, URL: url.URL{Host: "localhost:9000"}},
},
}
// Matching nodes should return the correct node.
if n := c.NodeByURL(&url.URL{Host: "localhost:8000"}); n != c.Nodes[0] {
if n := c.NodeByURL(url.URL{Host: "localhost:8000"}); n != c.Nodes[0] {
t.Fatalf("unexpected node: %#v", n)
}
// Non-existent nodes should return nil.
if n := c.NodeByURL(&url.URL{Scheme: "http", Host: "localhost:8000"}); n != nil {
if n := c.NodeByURL(url.URL{Scheme: "http", Host: "localhost:8000"}); n != nil {
t.Fatalf("expected nil node: %#v", n)
}
}
@ -53,11 +53,11 @@ func TestConfig_NodeByURL(t *testing.T) {
// Ensure that the config can add nodes.
func TestConfig_AddNode(t *testing.T) {
var c raft.Config
c.AddNode(1, &url.URL{Host: "localhost:8000"})
c.AddNode(2, &url.URL{Host: "localhost:9000"})
if n := c.Nodes[0]; !reflect.DeepEqual(n, &raft.ConfigNode{ID: 1, URL: &url.URL{Host: "localhost:8000"}}) {
c.AddNode(1, url.URL{Host: "localhost:8000"})
c.AddNode(2, url.URL{Host: "localhost:9000"})
if n := c.Nodes[0]; !reflect.DeepEqual(n, &raft.ConfigNode{ID: 1, URL: url.URL{Host: "localhost:8000"}}) {
t.Fatalf("unexpected node(0): %#v", n)
} else if n = c.Nodes[1]; !reflect.DeepEqual(n, &raft.ConfigNode{ID: 2, URL: &url.URL{Host: "localhost:9000"}}) {
} else if n = c.Nodes[1]; !reflect.DeepEqual(n, &raft.ConfigNode{ID: 2, URL: url.URL{Host: "localhost:9000"}}) {
t.Fatalf("unexpected node(1): %#v", n)
}
}
@ -65,8 +65,8 @@ func TestConfig_AddNode(t *testing.T) {
// Ensure that the config can remove nodes.
func TestConfig_RemoveNode(t *testing.T) {
var c raft.Config
c.AddNode(1, &url.URL{Host: "localhost:8000"})
c.AddNode(2, &url.URL{Host: "localhost:9000"})
c.AddNode(1, url.URL{Host: "localhost:8000"})
c.AddNode(2, url.URL{Host: "localhost:9000"})
if err := c.RemoveNode(1); err != nil {
t.Fatalf("unexpected error(0): %s", err)
} else if err = c.RemoveNode(2); err != nil {
@ -83,8 +83,8 @@ func TestConfigEncoder_Encode(t *testing.T) {
Index: 20,
MaxNodeID: 3,
Nodes: []*raft.ConfigNode{
{ID: 1, URL: &url.URL{Host: "localhost:8000"}},
{ID: 2, URL: &url.URL{Host: "localhost:9000"}},
{ID: 1, URL: url.URL{Host: "localhost:8000"}},
{ID: 2, URL: url.URL{Host: "localhost:9000"}},
},
}
@ -103,8 +103,8 @@ func TestConfigDecoder_Decode(t *testing.T) {
Index: 20,
MaxNodeID: 3,
Nodes: []*raft.ConfigNode{
{ID: 1, URL: &url.URL{Host: "localhost:8000"}},
{ID: 2, URL: &url.URL{Host: "localhost:9000"}},
{ID: 1, URL: url.URL{Host: "localhost:8000"}},
{ID: 2, URL: url.URL{Host: "localhost:9000"}},
},
}

View File

@ -12,7 +12,7 @@ import (
// Handler represents an HTTP endpoint for Raft to communicate over.
type Handler struct {
Log interface {
AddPeer(u *url.URL) (id uint64, leaderID uint64, config *Config, err error)
AddPeer(u url.URL) (id uint64, leaderID uint64, config *Config, err error)
RemovePeer(id uint64) error
Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64, err error)
WriteEntriesTo(w io.Writer, id, term, index uint64) error
@ -60,7 +60,7 @@ func (h *Handler) serveJoin(w http.ResponseWriter, r *http.Request) {
}
// Add peer to the log.
id, leaderID, config, err := h.Log.AddPeer(u)
id, leaderID, config, err := h.Log.AddPeer(*u)
if err != nil {
w.Header().Set("X-Raft-Error", err.Error())
w.WriteHeader(http.StatusInternalServerError)

View File

@ -11,10 +11,15 @@ import (
"github.com/influxdb/influxdb/raft"
)
func init() {
// Ensure Log implements the Handler.Log interface.
_ = raft.Handler{Log: raft.NewLog()}
}
// Ensure a node can join a cluster over HTTP.
func TestHandler_HandleJoin(t *testing.T) {
h := NewHandler()
h.AddPeerFunc = func(u *url.URL) (uint64, uint64, *raft.Config, error) {
h.AddPeerFunc = func(u url.URL) (uint64, uint64, *raft.Config, error) {
if u.String() != "http://localhost:1000" {
t.Fatalf("unexpected url: %s", u)
}
@ -42,7 +47,7 @@ func TestHandler_HandleJoin(t *testing.T) {
// Ensure that joining with an invalid query string with return an error.
func TestHandler_HandleJoin_Error(t *testing.T) {
h := NewHandler()
h.AddPeerFunc = func(u *url.URL) (uint64, uint64, *raft.Config, error) {
h.AddPeerFunc = func(u url.URL) (uint64, uint64, *raft.Config, error) {
return 0, 0, nil, raft.ErrClosed
}
s := httptest.NewServer(h)
@ -364,7 +369,7 @@ func TestHandler_Ping(t *testing.T) {
// Handler represents a test wrapper for the raft.Handler.
type Handler struct {
*raft.Handler
AddPeerFunc func(u *url.URL) (uint64, uint64, *raft.Config, error)
AddPeerFunc func(u url.URL) (uint64, uint64, *raft.Config, error)
RemovePeerFunc func(id uint64) error
HeartbeatFunc func(term, commitIndex, leaderID uint64) (currentIndex uint64, err error)
WriteEntriesToFunc func(w io.Writer, id, term, index uint64) error
@ -378,8 +383,8 @@ func NewHandler() *Handler {
return h
}
func (h *Handler) AddPeer(u *url.URL) (uint64, uint64, *raft.Config, error) { return h.AddPeerFunc(u) }
func (h *Handler) RemovePeer(id uint64) error { return h.RemovePeerFunc(id) }
func (h *Handler) AddPeer(u url.URL) (uint64, uint64, *raft.Config, error) { return h.AddPeerFunc(u) }
func (h *Handler) RemovePeer(id uint64) error { return h.RemovePeerFunc(id) }
func (h *Handler) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64, err error) {
return h.HeartbeatFunc(term, commitIndex, leaderID)

View File

@ -104,18 +104,18 @@ type Log struct {
closing chan struct{} // close notification
// Network address to the reach the log.
URL *url.URL
url url.URL
// The state machine that log entries will be applied to.
FSM FSM
// The transport used to communicate with other nodes in the cluster.
Transport interface {
Join(u *url.URL, nodeURL *url.URL) (id uint64, leaderID uint64, config *Config, err error)
Leave(u *url.URL, id uint64) error
Heartbeat(u *url.URL, term, commitIndex, leaderID uint64) (lastIndex uint64, err error)
ReadFrom(u *url.URL, id, term, index uint64) (io.ReadCloser, error)
RequestVote(u *url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) error
Join(u url.URL, nodeURL url.URL) (id uint64, leaderID uint64, config *Config, err error)
Leave(u url.URL, id uint64) error
Heartbeat(u url.URL, term, commitIndex, leaderID uint64) (lastIndex uint64, err error)
ReadFrom(u url.URL, id, term, index uint64) (io.ReadCloser, error)
RequestVote(u url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) error
}
// Clock is an abstraction of time.
@ -154,6 +154,25 @@ func NewLog() *Log {
// Returns an empty string if the log is closed.
func (l *Log) Path() string { return l.path }
// URL returns the URL for the log.
func (l *Log) URL() url.URL {
l.mu.Lock()
defer l.mu.Unlock()
return l.url
}
// SetURL sets the URL for the log. This must be set before opening.
func (l *Log) SetURL(u url.URL) {
l.mu.Lock()
defer l.mu.Unlock()
if l.opened() {
panic("url cannot be set while log is open")
}
l.url = u
}
func (l *Log) idPath() string { return filepath.Join(l.path, "id") }
func (l *Log) termPath() string { return filepath.Join(l.path, "term") }
func (l *Log) configPath() string { return filepath.Join(l.path, "config") }
@ -459,7 +478,7 @@ func (l *Log) Initialize() error {
// Generate a new configuration with one node.
config = &Config{MaxNodeID: id}
config.AddNode(id, l.URL)
config.AddNode(id, l.url)
// Generate new 8-hex digit cluster identifier.
config.ClusterID = uint64(l.Rand())
@ -511,8 +530,8 @@ func (l *Log) SetLogOutput(w io.Writer) {
func (l *Log) updateLogPrefix() {
var host string
if l.URL != nil {
host = l.URL.Host
if l.url.Host != "" {
host = l.url.Host
}
l.Logger.SetPrefix(fmt.Sprintf("[raft] %s ", host))
}
@ -533,13 +552,24 @@ func (l *Log) tracef(msg string, v ...interface{}) {
// Leader returns the id and URL associated with the current leader.
// Returns zero if there is no current leader.
func (l *Log) Leader() (id uint64, u *url.URL) {
func (l *Log) Leader() (id uint64, u url.URL) {
l.mu.Lock()
defer l.mu.Unlock()
return l.leader()
}
func (l *Log) leader() (id uint64, u *url.URL) {
// ClusterID returns the identifier for the cluster.
// Returns zero if the cluster has not been initialized yet.
func (l *Log) ClusterID() uint64 {
l.mu.Lock()
defer l.mu.Unlock()
if l.config == nil {
return 0
}
return l.config.ClusterID
}
func (l *Log) leader() (id uint64, u url.URL) {
// Ignore if there's no configuration set.
if l.config == nil {
return
@ -556,9 +586,9 @@ func (l *Log) leader() (id uint64, u *url.URL) {
// Join contacts a node in the cluster to request membership.
// A log cannot join a cluster if it has already been initialized.
func (l *Log) Join(u *url.URL) error {
func (l *Log) Join(u url.URL) error {
// Validate under lock.
var nodeURL *url.URL
var nodeURL url.URL
if err := func() error {
l.mu.Lock()
defer l.mu.Unlock()
@ -567,11 +597,11 @@ func (l *Log) Join(u *url.URL) error {
return ErrClosed
} else if l.id != 0 {
return ErrInitialized
} else if l.URL == nil {
} else if l.url.Host == "" {
return ErrURLRequired
}
nodeURL = l.URL
nodeURL = l.url
return nil
}(); err != nil {
return err
@ -727,7 +757,7 @@ func (l *Log) readFromLeader(wg *sync.WaitGroup, transitioning <-chan struct{})
l.mu.Unlock()
// If no leader exists then wait momentarily and retry.
if u == nil {
if u.Host == "" {
l.tracef("readFromLeader: no leader")
time.Sleep(100 * time.Millisecond)
continue
@ -1309,9 +1339,9 @@ func (l *Log) mustApplyRemovePeer(e *LogEntry) error {
// AddPeer creates a new peer in the cluster.
// Returns the new peer's identifier and the current configuration.
func (l *Log) AddPeer(u *url.URL) (uint64, uint64, *Config, error) {
func (l *Log) AddPeer(u url.URL) (uint64, uint64, *Config, error) {
// Validate URL.
if u == nil {
if u.Host == "" {
return 0, 0, nil, fmt.Errorf("peer url required")
}

View File

@ -19,7 +19,7 @@ import (
// Ensure that opening an already open log returns an error.
func TestLog_Open_ErrOpen(t *testing.T) {
l := NewInitializedLog(&url.URL{Host: "log0"})
l := NewInitializedLog(url.URL{Host: "log0"})
defer l.Close()
if err := l.Open(tempfile()); err != raft.ErrOpen {
t.Fatal("expected error")
@ -28,7 +28,7 @@ func TestLog_Open_ErrOpen(t *testing.T) {
// Ensure that a log can be checked for being open.
func TestLog_Opened(t *testing.T) {
l := NewInitializedLog(&url.URL{Host: "log0"})
l := NewInitializedLog(url.URL{Host: "log0"})
if l.Opened() != true {
t.Fatalf("expected open")
}
@ -40,7 +40,7 @@ func TestLog_Opened(t *testing.T) {
// Ensure that reopening an existing log will restore its ID.
func TestLog_Reopen(t *testing.T) {
l := NewInitializedLog(&url.URL{Host: "log0"})
l := NewInitializedLog(url.URL{Host: "log0"})
if l.ID() != 1 {
t.Fatalf("expected id == 1")
}
@ -64,7 +64,7 @@ func TestLog_Reopen(t *testing.T) {
// Ensure that a single node-cluster can apply a log entry.
func TestLog_Apply(t *testing.T) {
l := NewInitializedLog(&url.URL{Host: "log0"})
l := NewInitializedLog(url.URL{Host: "log0"})
defer l.Close()
// Apply a command.
@ -87,7 +87,7 @@ func TestLog_Apply(t *testing.T) {
// Ensure that a node has no configuration after it's closed.
func TestLog_Config_Closed(t *testing.T) {
l := NewInitializedLog(&url.URL{Host: "log0"})
l := NewInitializedLog(url.URL{Host: "log0"})
defer l.Close()
l.Log.Close()
if l.Config() != nil {
@ -353,12 +353,13 @@ func NewCluster(fsmFn func() raft.FSM) *Cluster {
logN := 3
for i := 0; i < logN; i++ {
l := NewLog(&url.URL{Host: fmt.Sprintf("log%d", i)})
l := NewLog(url.URL{Host: fmt.Sprintf("log%d", i)})
l.Log.FSM = fsmFn()
l.Transport = t
c.Logs = append(c.Logs, l)
t.register(l.Log)
warnf("Log %s: %p", l.URL.String(), l.Log)
u := l.URL()
warnf("Log %s: %p", u.String(), l.Log)
}
warn("")
@ -372,7 +373,7 @@ func NewCluster(fsmFn func() raft.FSM) *Cluster {
c.Logs[0].MustWaitUncommitted(2)
c.Logs[0].Clock.apply()
}()
if err := c.Logs[1].Join(c.Logs[0].URL); err != nil {
if err := c.Logs[1].Join(c.Logs[0].URL()); err != nil {
panic("join: " + err.Error())
}
c.Logs[0].Clock.heartbeat()
@ -390,7 +391,7 @@ func NewCluster(fsmFn func() raft.FSM) *Cluster {
c.Logs[1].Clock.apply()
c.Logs[2].Clock.apply()
}()
if err := c.Logs[2].Log.Join(c.Logs[0].Log.URL); err != nil {
if err := c.Logs[2].Log.Join(c.Logs[0].Log.URL()); err != nil {
panic("join: " + err.Error())
}
@ -409,14 +410,15 @@ func NewRealTimeCluster(logN int, fsmFn func() raft.FSM) *Cluster {
t := NewTransport()
for i := 0; i < logN; i++ {
l := NewLog(&url.URL{Host: fmt.Sprintf("log%d", i)})
l := NewLog(url.URL{Host: fmt.Sprintf("log%d", i)})
l.Log.FSM = fsmFn()
l.Clock = nil
l.Log.Clock = raft.NewClock()
l.Transport = t
c.Logs = append(c.Logs, l)
t.register(l.Log)
warnf("Log %s: %p", l.URL.String(), l.Log)
u := l.URL()
warnf("Log %s: %p", u.String(), l.Log)
}
warn("")
@ -427,7 +429,7 @@ func NewRealTimeCluster(logN int, fsmFn func() raft.FSM) *Cluster {
// Join remaining nodes.
for i := 1; i < logN; i++ {
c.Logs[i].MustOpen()
c.Logs[i].MustJoin(c.Logs[0].URL)
c.Logs[i].MustJoin(c.Logs[0].URL())
}
// Ensure nodes are ready.
@ -494,9 +496,9 @@ type Log struct {
}
// NewLog returns a new instance of Log.
func NewLog(u *url.URL) *Log {
func NewLog(u url.URL) *Log {
l := &Log{Log: raft.NewLog(), Clock: NewClock()}
l.URL = u
l.SetURL(u)
l.Log.Clock = l.Clock
l.Rand = seq()
l.DebugEnabled = true
@ -507,7 +509,7 @@ func NewLog(u *url.URL) *Log {
}
// NewInitializedLog returns a new initialized Node.
func NewInitializedLog(u *url.URL) *Log {
func NewInitializedLog(u url.URL) *Log {
l := NewLog(u)
l.Log.FSM = &FSM{}
l.MustOpen()
@ -536,7 +538,7 @@ func (l *Log) MustInitialize() {
}
// MustJoin joins the log to another log. Panic on error.
func (l *Log) MustJoin(u *url.URL) {
func (l *Log) MustJoin(u url.URL) {
if err := l.Join(u); err != nil {
panic("join: " + err.Error())
}
@ -552,21 +554,24 @@ func (l *Log) Close() error {
// MustWaits waits for at least a given applied index. Panic on error.
func (l *Log) MustWait(index uint64) {
if err := l.Log.Wait(index); err != nil {
panic(l.URL.String() + " wait: " + err.Error())
u := l.URL()
panic(u.String() + " wait: " + err.Error())
}
}
// MustCommitted waits for at least a given committed index. Panic on error.
func (l *Log) MustWaitCommitted(index uint64) {
if err := l.Log.WaitCommitted(index); err != nil {
panic(l.URL.String() + " wait committed: " + err.Error())
u := l.URL()
panic(u.String() + " wait committed: " + err.Error())
}
}
// MustWaitUncommitted waits for at least a given uncommitted index. Panic on error.
func (l *Log) MustWaitUncommitted(index uint64) {
if err := l.Log.WaitUncommitted(index); err != nil {
panic(l.URL.String() + " wait uncommitted: " + err.Error())
u := l.URL()
panic(u.String() + " wait uncommitted: " + err.Error())
}
}

View File

@ -15,9 +15,9 @@ import (
type HTTPTransport struct{}
// Join requests membership into a node's cluster.
func (t *HTTPTransport) Join(uri *url.URL, nodeURL *url.URL) (uint64, uint64, *Config, error) {
func (t *HTTPTransport) Join(uri url.URL, nodeURL url.URL) (uint64, uint64, *Config, error) {
// Construct URL.
u := *uri
u := uri
u.Path = path.Join(u.Path, "raft/join")
u.RawQuery = (&url.Values{"url": {nodeURL.String()}}).Encode()
@ -55,9 +55,9 @@ func (t *HTTPTransport) Join(uri *url.URL, nodeURL *url.URL) (uint64, uint64, *C
}
// Leave removes a node from a cluster's membership.
func (t *HTTPTransport) Leave(uri *url.URL, id uint64) error {
func (t *HTTPTransport) Leave(uri url.URL, id uint64) error {
// Construct URL.
u := *uri
u := uri
u.Path = path.Join(u.Path, "raft/leave")
u.RawQuery = (&url.Values{"id": {strconv.FormatUint(id, 10)}}).Encode()
@ -77,9 +77,9 @@ func (t *HTTPTransport) Leave(uri *url.URL, id uint64) error {
}
// Heartbeat checks the status of a follower.
func (t *HTTPTransport) Heartbeat(uri *url.URL, term, commitIndex, leaderID uint64) (uint64, error) {
func (t *HTTPTransport) Heartbeat(uri url.URL, term, commitIndex, leaderID uint64) (uint64, error) {
// Construct URL.
u := *uri
u := uri
u.Path = path.Join(u.Path, "raft/heartbeat")
// Set URL parameters.
@ -112,9 +112,9 @@ func (t *HTTPTransport) Heartbeat(uri *url.URL, term, commitIndex, leaderID uint
}
// ReadFrom streams the log from a leader.
func (t *HTTPTransport) ReadFrom(uri *url.URL, id, term, index uint64) (io.ReadCloser, error) {
func (t *HTTPTransport) ReadFrom(uri url.URL, id, term, index uint64) (io.ReadCloser, error) {
// Construct URL.
u := *uri
u := uri
u.Path = path.Join(u.Path, "raft/stream")
// Set URL parameters.
@ -140,9 +140,9 @@ func (t *HTTPTransport) ReadFrom(uri *url.URL, id, term, index uint64) (io.ReadC
}
// RequestVote requests a vote for a candidate in a given term.
func (t *HTTPTransport) RequestVote(uri *url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) error {
func (t *HTTPTransport) RequestVote(uri url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) error {
// Construct URL.
u := *uri
u := uri
u.Path = path.Join(u.Path, "raft/vote")
// Set URL parameters.

View File

@ -34,7 +34,7 @@ func TestHTTPTransport_Join(t *testing.T) {
// Execute join against test server.
u, _ := url.Parse(s.URL)
id, leaderID, config, err := (&raft.HTTPTransport{}).Join(u, &url.URL{Host: "local"})
id, leaderID, config, err := (&raft.HTTPTransport{}).Join(*u, url.URL{Host: "local"})
if err != nil {
t.Fatalf("unexpected error: %s", err)
} else if id != 1 {
@ -48,7 +48,7 @@ func TestHTTPTransport_Join(t *testing.T) {
// Ensure that joining a server that doesn't exist returns an error.
func TestHTTPTransport_Join_ErrConnectionRefused(t *testing.T) {
_, _, _, err := (&raft.HTTPTransport{}).Join(&url.URL{Scheme: "http", Host: "localhost:27322"}, &url.URL{Host: "local"})
_, _, _, err := (&raft.HTTPTransport{}).Join(url.URL{Scheme: "http", Host: "localhost:27322"}, url.URL{Host: "local"})
if err == nil || !strings.Contains(err.Error(), "connection refused") {
t.Fatalf("unexpected error: %s", err)
}
@ -64,7 +64,7 @@ func TestHTTPTransport_Join_ErrInvalidID(t *testing.T) {
// Execute join against test server.
u, _ := url.Parse(s.URL)
_, _, _, err := (&raft.HTTPTransport{}).Join(u, &url.URL{Host: "local"})
_, _, _, err := (&raft.HTTPTransport{}).Join(*u, url.URL{Host: "local"})
if err == nil || err.Error() != `invalid id: "xxx"` {
t.Fatalf("unexpected error: %s", err)
}
@ -82,7 +82,7 @@ func TestHTTPTransport_Join_ErrInvalidConfig(t *testing.T) {
// Execute join against test server.
u, _ := url.Parse(s.URL)
_, _, _, err := (&raft.HTTPTransport{}).Join(u, &url.URL{Host: "local"})
_, _, _, err := (&raft.HTTPTransport{}).Join(*u, url.URL{Host: "local"})
if err == nil || err.Error() != `config unmarshal: unexpected EOF` {
t.Fatalf("unexpected error: %s", err)
}
@ -99,7 +99,7 @@ func TestHTTPTransport_Join_Err(t *testing.T) {
// Execute join against test server.
u, _ := url.Parse(s.URL)
_, _, _, err := (&raft.HTTPTransport{}).Join(u, &url.URL{Host: "local"})
_, _, _, err := (&raft.HTTPTransport{}).Join(*u, url.URL{Host: "local"})
if err == nil || err.Error() != `oh no` {
t.Fatalf("unexpected error: %s", err)
}
@ -119,14 +119,14 @@ func TestHTTPTransport_Leave(t *testing.T) {
// Execute leave against test server.
u, _ := url.Parse(s.URL)
if err := (&raft.HTTPTransport{}).Leave(u, 1); err != nil {
if err := (&raft.HTTPTransport{}).Leave(*u, 1); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that leaving a server that doesn't exist returns an error.
func TestHTTPTransport_Leave_ErrConnectionRefused(t *testing.T) {
err := (&raft.HTTPTransport{}).Leave(&url.URL{Scheme: "http", Host: "localhost:27322"}, 1)
err := (&raft.HTTPTransport{}).Leave(url.URL{Scheme: "http", Host: "localhost:27322"}, 1)
if err == nil || !strings.Contains(err.Error(), "connection refused") {
t.Fatalf("unexpected error: %s", err)
}
@ -142,7 +142,7 @@ func TestHTTPTransport_Leave_Err(t *testing.T) {
// Execute leave against test server.
u, _ := url.Parse(s.URL)
err := (&raft.HTTPTransport{}).Leave(u, 1)
err := (&raft.HTTPTransport{}).Leave(*u, 1)
if err == nil || err.Error() != `oh no` {
t.Fatalf("unexpected error: %s", err)
}
@ -171,7 +171,7 @@ func TestHTTPTransport_Heartbeat(t *testing.T) {
// Execute heartbeat against test server.
u, _ := url.Parse(s.URL)
newIndex, err := (&raft.HTTPTransport{}).Heartbeat(u, 1, 2, 3)
newIndex, err := (&raft.HTTPTransport{}).Heartbeat(*u, 1, 2, 3)
if err != nil {
t.Fatalf("unexpected error: %s", err)
} else if newIndex != 4 {
@ -198,7 +198,7 @@ func TestHTTPTransport_Heartbeat_Err(t *testing.T) {
}))
u, _ := url.Parse(s.URL)
_, err := (&raft.HTTPTransport{}).Heartbeat(u, 1, 2, 3)
_, err := (&raft.HTTPTransport{}).Heartbeat(*u, 1, 2, 3)
if err == nil {
t.Errorf("%d. expected error", i)
} else if tt.err != err.Error() {
@ -211,7 +211,7 @@ func TestHTTPTransport_Heartbeat_Err(t *testing.T) {
// Ensure an HTTP heartbeat to a stopped server returns an error.
func TestHTTPTransport_Heartbeat_ErrConnectionRefused(t *testing.T) {
u, _ := url.Parse("http://localhost:41932")
_, err := (&raft.HTTPTransport{}).Heartbeat(u, 0, 0, 0)
_, err := (&raft.HTTPTransport{}).Heartbeat(*u, 0, 0, 0)
if err == nil {
t.Fatal("expected error")
} else if !strings.Contains(err.Error(), `connection refused`) {
@ -241,7 +241,7 @@ func TestHTTPTransport_ReadFrom(t *testing.T) {
// Execute stream against test server.
u, _ := url.Parse(s.URL)
r, err := (&raft.HTTPTransport{}).ReadFrom(u, 1, 2, 3)
r, err := (&raft.HTTPTransport{}).ReadFrom(*u, 1, 2, 3)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@ -261,7 +261,7 @@ func TestHTTPTransport_ReadFrom_Err(t *testing.T) {
// Execute stream against test server.
u, _ := url.Parse(s.URL)
r, err := (&raft.HTTPTransport{}).ReadFrom(u, 0, 0, 0)
r, err := (&raft.HTTPTransport{}).ReadFrom(*u, 0, 0, 0)
if err == nil {
t.Fatalf("expected error")
} else if err.Error() != `bad stream` {
@ -274,7 +274,7 @@ func TestHTTPTransport_ReadFrom_Err(t *testing.T) {
// Ensure an streaming over HTTP to a stopped server returns an error.
func TestHTTPTransport_ReadFrom_ErrConnectionRefused(t *testing.T) {
u, _ := url.Parse("http://localhost:41932")
_, err := (&raft.HTTPTransport{}).ReadFrom(u, 0, 0, 0)
_, err := (&raft.HTTPTransport{}).ReadFrom(*u, 0, 0, 0)
if err == nil {
t.Fatal("expected error")
} else if !strings.Contains(err.Error(), `connection refused`) {
@ -307,7 +307,7 @@ func TestHTTPTransport_RequestVote(t *testing.T) {
// Execute heartbeat against test server.
u, _ := url.Parse(s.URL)
if err := (&raft.HTTPTransport{}).RequestVote(u, 1, 2, 3, 4); err != nil {
if err := (&raft.HTTPTransport{}).RequestVote(*u, 1, 2, 3, 4); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
@ -322,7 +322,7 @@ func TestHTTPTransport_RequestVote_Error(t *testing.T) {
defer s.Close()
u, _ := url.Parse(s.URL)
if err := (&raft.HTTPTransport{}).RequestVote(u, 0, 0, 0, 0); err == nil {
if err := (&raft.HTTPTransport{}).RequestVote(*u, 0, 0, 0, 0); err == nil {
t.Errorf("expected error")
} else if err.Error() != `already voted` {
t.Errorf("unexpected error: %s", err)
@ -332,7 +332,7 @@ func TestHTTPTransport_RequestVote_Error(t *testing.T) {
// Ensure that requesting a vote over HTTP to a stopped server returns an error.
func TestHTTPTransport_RequestVote_ErrConnectionRefused(t *testing.T) {
u, _ := url.Parse("http://localhost:41932")
if err := (&raft.HTTPTransport{}).RequestVote(u, 0, 0, 0, 0); err == nil {
if err := (&raft.HTTPTransport{}).RequestVote(*u, 0, 0, 0, 0); err == nil {
t.Fatal("expected error")
} else if !strings.Contains(err.Error(), `connection refused`) {
t.Fatalf("unexpected error: %s", err)
@ -352,11 +352,11 @@ func NewTransport() *Transport {
// register registers a log by hostname.
func (t *Transport) register(l *raft.Log) {
t.logs[l.URL.Host] = l
t.logs[l.URL().Host] = l
}
// log returns a log registered by hostname.
func (t *Transport) log(u *url.URL) (*raft.Log, error) {
func (t *Transport) log(u url.URL) (*raft.Log, error) {
if l := t.logs[u.Host]; l != nil {
return l, nil
}
@ -364,7 +364,7 @@ func (t *Transport) log(u *url.URL) (*raft.Log, error) {
}
// Join calls the AddPeer method on the target log.
func (t *Transport) Join(u *url.URL, nodeURL *url.URL) (uint64, uint64, *raft.Config, error) {
func (t *Transport) Join(u url.URL, nodeURL url.URL) (uint64, uint64, *raft.Config, error) {
l, err := t.log(u)
if err != nil {
return 0, 0, nil, err
@ -373,7 +373,7 @@ func (t *Transport) Join(u *url.URL, nodeURL *url.URL) (uint64, uint64, *raft.Co
}
// Leave calls the RemovePeer method on the target log.
func (t *Transport) Leave(u *url.URL, id uint64) error {
func (t *Transport) Leave(u url.URL, id uint64) error {
l, err := t.log(u)
if err != nil {
return err
@ -382,7 +382,7 @@ func (t *Transport) Leave(u *url.URL, id uint64) error {
}
// Heartbeat calls the Heartbeat method on the target log.
func (t *Transport) Heartbeat(u *url.URL, term, commitIndex, leaderID uint64) (lastIndex uint64, err error) {
func (t *Transport) Heartbeat(u url.URL, term, commitIndex, leaderID uint64) (lastIndex uint64, err error) {
l, err := t.log(u)
if err != nil {
return 0, err
@ -391,7 +391,7 @@ func (t *Transport) Heartbeat(u *url.URL, term, commitIndex, leaderID uint64) (l
}
// ReadFrom streams entries from the target log.
func (t *Transport) ReadFrom(u *url.URL, id, term, index uint64) (io.ReadCloser, error) {
func (t *Transport) ReadFrom(u url.URL, id, term, index uint64) (io.ReadCloser, error) {
l, err := t.log(u)
if err != nil {
return nil, err
@ -409,7 +409,7 @@ func (t *Transport) ReadFrom(u *url.URL, id, term, index uint64) (io.ReadCloser,
}
// RequestVote calls RequestVote() on the target log.
func (t *Transport) RequestVote(u *url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) error {
func (t *Transport) RequestVote(u url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) error {
l, err := t.log(u)
if err != nil {
return err

View File

@ -355,7 +355,7 @@ func (s *Server) setClient(client MessagingClient) error {
if client != nil {
// Create connection for broadcast channel.
conn := client.Conn(BroadcastTopicID)
if err := conn.Open(s.index); err != nil {
if err := conn.Open(s.index, true); err != nil {
return fmt.Errorf("open conn: %s", err)
}
@ -417,16 +417,16 @@ func (s *Server) Sync(index uint64) error {
}
// Initialize creates a new data node and initializes the server's id to 1.
func (s *Server) Initialize(u *url.URL) error {
func (s *Server) Initialize(u url.URL) error {
// Create a new data node.
if err := s.CreateDataNode(u); err != nil {
if err := s.CreateDataNode(&u); err != nil {
return err
}
// Ensure the data node returns with an ID of 1.
// If it doesn't then something went really wrong. We have to panic because
// the messaging client relies on the first server being assigned ID 1.
n := s.DataNodeByURL(u)
n := s.DataNodeByURL(&u)
assert(n != nil && n.ID == 1, "invalid initial server id: %d", n.ID)
// Set the ID on the metastore.
@ -2816,16 +2816,33 @@ func (r *Results) Error() error {
// MessagingClient represents the client used to connect to brokers.
type MessagingClient interface {
Open(path string, urls []url.URL) error
Close() error
// Publishes a message to the broker.
Publish(m *messaging.Message) (index uint64, err error)
// Conn returns an open, streaming connection to a topic.
Conn(topicID uint64) MessagingConn
// Sets the logging destination.
SetLogOutput(w io.Writer)
}
type messagingClient struct {
*messaging.Client
}
// NewMessagingClient returns an instance of MessagingClient.
func NewMessagingClient() MessagingClient {
return &messagingClient{messaging.NewClient()}
}
func (c *messagingClient) Conn(topicID uint64) MessagingConn { return c.Client.Conn(topicID) }
// MessagingConn represents a streaming connection to a single broker topic.
type MessagingConn interface {
Open(index uint64) error
Open(index uint64, streaming bool) error
C() <-chan *messaging.Message
}

View File

@ -10,13 +10,12 @@ import (
"os"
"reflect"
"strings"
"sync"
"testing"
"time"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/messaging"
"github.com/influxdb/influxdb/test"
"golang.org/x/crypto/bcrypt"
)
@ -40,7 +39,7 @@ func TestServer_Open_ErrPathRequired(t *testing.T) { t.Skip("pending") }
// Ensure the server can create a new data node.
func TestServer_CreateDataNode(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -64,7 +63,7 @@ func TestServer_CreateDataNode(t *testing.T) {
// Ensure the server returns an error when creating a duplicate node.
func TestServer_CreateDatabase_ErrDataNodeExists(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -81,7 +80,7 @@ func TestServer_CreateDatabase_ErrDataNodeExists(t *testing.T) {
// Ensure the server can delete a node.
func TestServer_DeleteDataNode(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -106,7 +105,7 @@ func TestServer_DeleteDataNode(t *testing.T) {
// Test unuathorized requests logging
func TestServer_UnauthorizedRequests(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -149,7 +148,7 @@ func TestServer_UnauthorizedRequests(t *testing.T) {
// Test user privilege authorization.
func TestServer_UserPrivilegeAuthorization(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -186,7 +185,7 @@ func TestServer_UserPrivilegeAuthorization(t *testing.T) {
// Test single statement query authorization.
func TestServer_SingleStatementQueryAuthorization(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -255,7 +254,7 @@ func TestServer_SingleStatementQueryAuthorization(t *testing.T) {
// Test multiple statement query authorization.
func TestServer_MultiStatementQueryAuthorization(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -302,7 +301,7 @@ func TestServer_MultiStatementQueryAuthorization(t *testing.T) {
// Ensure the server can create a database.
func TestServer_CreateDatabase(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -321,7 +320,7 @@ func TestServer_CreateDatabase(t *testing.T) {
// Ensure the server returns an error when creating a duplicate database.
func TestServer_CreateDatabase_ErrDatabaseExists(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -337,7 +336,7 @@ func TestServer_CreateDatabase_ErrDatabaseExists(t *testing.T) {
// Ensure the server can drop a database.
func TestServer_DropDatabase(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -360,7 +359,7 @@ func TestServer_DropDatabase(t *testing.T) {
// Ensure the server returns an error when dropping a database that doesn't exist.
func TestServer_DropDatabase_ErrDatabaseNotFound(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -373,7 +372,7 @@ func TestServer_DropDatabase_ErrDatabaseNotFound(t *testing.T) {
// Ensure the server can return a list of all databases.
func TestServer_Databases(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -395,7 +394,7 @@ func TestServer_Databases(t *testing.T) {
// Ensure the server can create a new user.
func TestServer_CreateUser(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -433,7 +432,7 @@ func TestServer_CreateUser(t *testing.T) {
// Ensure the server correctly detects when there is an admin user.
func TestServer_AdminUserExists(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -464,7 +463,7 @@ func TestServer_AdminUserExists(t *testing.T) {
// Ensure the server returns an error when creating an user without a name.
func TestServer_CreateUser_ErrUsernameRequired(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -475,7 +474,7 @@ func TestServer_CreateUser_ErrUsernameRequired(t *testing.T) {
// Ensure the server returns an error when creating a duplicate user.
func TestServer_CreateUser_ErrUserExists(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -489,7 +488,7 @@ func TestServer_CreateUser_ErrUserExists(t *testing.T) {
// Ensure the server can delete an existing user.
func TestServer_DeleteUser(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -516,7 +515,7 @@ func TestServer_DeleteUser(t *testing.T) {
// Ensure the server can return a list of all users.
func TestServer_Users(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -538,7 +537,7 @@ func TestServer_Users(t *testing.T) {
// Ensure the server does not return non-existent users
func TestServer_NonExistingUsers(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -561,7 +560,7 @@ func TestServer_NonExistingUsers(t *testing.T) {
// Ensure the database can create a new retention policy.
func TestServer_CreateRetentionPolicy(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -594,7 +593,7 @@ func TestServer_CreateRetentionPolicy(t *testing.T) {
// Ensure the server returns an error when creating a retention policy with an invalid db.
func TestServer_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -605,7 +604,7 @@ func TestServer_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
// Ensure the server returns an error when creating a retention policy without a name.
func TestServer_CreateRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -617,7 +616,7 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.
// Ensure the server returns an error when creating a duplicate retention policy.
func TestServer_CreateRetentionPolicy_ErrRetentionPolicyExists(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -630,7 +629,7 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyExists(t *testing.T) {
// Ensure the database can alter an existing retention policy.
func TestServer_AlterRetentionPolicy(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -696,7 +695,7 @@ func TestServer_AlterRetentionPolicy(t *testing.T) {
// Ensure the server can delete an existing retention policy.
func TestServer_DeleteRetentionPolicy(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -724,7 +723,7 @@ func TestServer_DeleteRetentionPolicy(t *testing.T) {
// Ensure the server returns an error when deleting a retention policy on invalid db.
func TestServer_DeleteRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -735,7 +734,7 @@ func TestServer_DeleteRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
// Ensure the server returns an error when deleting a retention policy without a name.
func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -747,7 +746,7 @@ func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.
// Ensure the server returns an error when deleting a non-existent retention policy.
func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -759,7 +758,7 @@ func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) {
// Ensure the server can set the default retention policy
func TestServer_SetDefaultRetentionPolicy(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -794,7 +793,7 @@ func TestServer_SetDefaultRetentionPolicy(t *testing.T) {
// Ensure the server returns an error when setting the default retention policy to a non-existant one.
func TestServer_SetDefaultRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -806,7 +805,7 @@ func TestServer_SetDefaultRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.
// Ensure the server prohibits a zero check interval for retention policy enforcement.
func TestServer_StartRetentionPolicyEnforcement_ErrZeroInterval(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -816,7 +815,7 @@ func TestServer_StartRetentionPolicyEnforcement_ErrZeroInterval(t *testing.T) {
}
func TestServer_EnforceRetentionPolices(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -853,7 +852,7 @@ func TestServer_EnforceRetentionPolices(t *testing.T) {
// Ensure the database can write data to the database.
func TestServer_WriteSeries(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -867,7 +866,6 @@ func TestServer_WriteSeries(t *testing.T) {
t.Fatal(err)
}
c.Sync(index)
warn("A")
// Write another point 10 seconds later so it goes through "raw series".
index, err = s.WriteSeries("foo", "mypolicy", []influxdb.Point{{Name: "cpu_load", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:10Z"), Fields: map[string]interface{}{"value": float64(100)}}})
@ -875,7 +873,6 @@ func TestServer_WriteSeries(t *testing.T) {
t.Fatal(err)
}
c.Sync(index)
warn("B")
// Retrieve first series data point.
if v, err := s.ReadSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:00Z")); err != nil {
@ -901,7 +898,7 @@ func TestServer_WriteSeries(t *testing.T) {
// Ensure the server can drop a measurement.
func TestServer_DropMeasurement(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -964,7 +961,7 @@ func TestServer_DropMeasurement(t *testing.T) {
// Ensure the server can handles drop measurement if none exists.
func TestServer_DropMeasurementNoneExists(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -1004,7 +1001,7 @@ func TestServer_DropMeasurementNoneExists(t *testing.T) {
// select * from memory where host=serverb
// select * from memory where region=uswest
func TestServer_DropMeasurementSeriesTagsPreserved(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -1118,7 +1115,7 @@ func TestServer_DropMeasurementSeriesTagsPreserved(t *testing.T) {
// Ensure the server can drop a series.
func TestServer_DropSeries(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -1162,7 +1159,7 @@ func TestServer_DropSeries(t *testing.T) {
// Ensure the server can drop a series from measurement when more than one shard exists.
func TestServer_DropSeriesFromMeasurement(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -1204,7 +1201,7 @@ func TestServer_DropSeriesFromMeasurement(t *testing.T) {
// Ensure that when merging many series together and some of them have a different number of points than others
// in a group by interval the results are correct
func TestServer_MergeManySeries(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -1239,7 +1236,7 @@ func TestServer_MergeManySeries(t *testing.T) {
// ensure that the dropped series is gone
// ensure that we can still query: select value from cpu where region=uswest
func TestServer_DropSeriesTagsPreserved(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -1313,7 +1310,7 @@ func TestServer_DropSeriesTagsPreserved(t *testing.T) {
// Ensure the server can execute a query and return the data correctly.
func TestServer_ExecuteQuery(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1393,7 +1390,7 @@ func TestServer_ExecuteQuery(t *testing.T) {
// Ensure the server respects limit and offset in show series queries
func TestServer_ShowSeriesLimitOffset(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1449,7 +1446,7 @@ func TestServer_ShowSeriesLimitOffset(t *testing.T) {
// Ensure that when querying for raw data values that they return in time order
func TestServer_RawDataReturnsInOrder(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1496,7 +1493,7 @@ func TestServer_RawDataReturnsInOrder(t *testing.T) {
// Ensure that limit and offset work
func TestServer_LimitAndOffset(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1545,7 +1542,7 @@ func TestServer_LimitAndOffset(t *testing.T) {
// Ensure the server can execute a wildcard query and return the data correctly.
func TestServer_ExecuteWildcardQuery(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1571,7 +1568,7 @@ func TestServer_ExecuteWildcardQuery(t *testing.T) {
// Ensure the server can execute a wildcard GROUP BY
func TestServer_ExecuteWildcardGroupBy(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1604,7 +1601,7 @@ func TestServer_ExecuteWildcardGroupBy(t *testing.T) {
}
func TestServer_CreateShardGroupIfNotExist(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1629,7 +1626,7 @@ func TestServer_CreateShardGroupIfNotExist(t *testing.T) {
}
func TestServer_DeleteShardGroup(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1667,7 +1664,7 @@ func TestServer_DeleteShardGroup(t *testing.T) {
/* TODO(benbjohnson): Change test to not expose underlying series ids directly.
func TestServer_Measurements(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1738,7 +1735,7 @@ func TestServer_NormalizeMeasurement(t *testing.T) {
}
// Create server with a variety of databases, retention policies, and measurements
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1787,7 +1784,7 @@ func TestServer_NormalizeQuery(t *testing.T) {
}
// Start server with database & retention policy.
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1809,7 +1806,7 @@ func TestServer_NormalizeQuery(t *testing.T) {
// Ensure the server can create a continuous query
func TestServer_CreateContinuousQuery(t *testing.T) {
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1871,7 +1868,7 @@ func TestServer_CreateContinuousQuery_ErrInfinteLoop(t *testing.T) {
// Ensure
func TestServer_RunContinuousQueries(t *testing.T) {
t.Skip()
c := NewMessagingClient()
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -2019,7 +2016,7 @@ func NewServer() *Server {
// OpenServer returns a new, open test server instance.
func OpenServer(client influxdb.MessagingClient) *Server {
s := OpenUninitializedServer(client)
if err := s.Initialize(&url.URL{Host: "127.0.0.1:8080"}); err != nil {
if err := s.Initialize(url.URL{Host: "127.0.0.1:8080"}); err != nil {
panic(err.Error())
}
return s
@ -2081,147 +2078,10 @@ func (s *Server) MustWriteSeries(database, retentionPolicy string, points []infl
if err != nil {
panic(err.Error())
}
s.Client().(*MessagingClient).Sync(index)
s.Client().(*test.MessagingClient).Sync(index)
return index
}
// MessagingClient represents a test client for the messaging broker.
type MessagingClient struct {
mu sync.Mutex
index uint64 // highest index
conns []*MessagingConn // list of all connections
messagesByTopicID map[uint64][]*messaging.Message // message by topic
PublishFunc func(*messaging.Message) (uint64, error)
ConnFunc func(topicID uint64) influxdb.MessagingConn
}
// NewMessagingClient returns a new instance of MessagingClient.
func NewMessagingClient() *MessagingClient {
c := &MessagingClient{
messagesByTopicID: make(map[uint64][]*messaging.Message),
}
c.PublishFunc = c.DefaultPublishFunc
c.ConnFunc = c.DefaultConnFunc
return c
}
// Close closes all open connections.
func (c *MessagingClient) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
for _, conn := range c.conns {
conn.Close()
}
return nil
}
func (c *MessagingClient) Publish(m *messaging.Message) (uint64, error) { return c.PublishFunc(m) }
// DefaultPublishFunc sets an autoincrementing index on the message and sends it to each topic connection.
func (c *MessagingClient) DefaultPublishFunc(m *messaging.Message) (uint64, error) {
c.mu.Lock()
defer c.mu.Unlock()
// Increment index and assign it to message.
c.index++
m.Index = c.index
// Append message to the topic.
c.messagesByTopicID[m.TopicID] = append(c.messagesByTopicID[m.TopicID], m)
// Send to each connection for the topic.
for _, conn := range c.conns {
if conn.topicID == m.TopicID {
conn.Send(m)
}
}
return m.Index, nil
}
func (c *MessagingClient) Conn(topicID uint64) influxdb.MessagingConn {
return c.ConnFunc(topicID)
}
// DefaultConnFunc returns a connection for a specific topic.
func (c *MessagingClient) DefaultConnFunc(topicID uint64) influxdb.MessagingConn {
c.mu.Lock()
defer c.mu.Unlock()
// Create new connection.
conn := NewMessagingConn(topicID)
// Track connections.
c.conns = append(c.conns, conn)
return conn
}
// Sync blocks until a given index has been sent through the client.
func (c *MessagingClient) Sync(index uint64) {
for {
c.mu.Lock()
if c.index >= index {
c.mu.Unlock()
time.Sleep(10 * time.Millisecond)
return
}
c.mu.Unlock()
// Otherwise wait momentarily and check again.
time.Sleep(1 * time.Millisecond)
}
}
// MessagingConn represents a mockable connection implementing influxdb.MessagingConn.
type MessagingConn struct {
mu sync.Mutex
topicID uint64
index uint64
c chan *messaging.Message
}
// NewMessagingConn returns a new instance of MessagingConn.
func NewMessagingConn(topicID uint64) *MessagingConn {
return &MessagingConn{
topicID: topicID,
}
}
// Open starts the stream from a given index.
func (c *MessagingConn) Open(index uint64) error {
// TODO: Fill connection stream with existing messages.
c.c = make(chan *messaging.Message, 1024)
return nil
}
// Close closes the streaming channel.
func (c *MessagingConn) Close() error {
close(c.c)
return nil
}
// C returns a channel for streaming message.
func (c *MessagingConn) C() <-chan *messaging.Message { return c.c }
func (c *MessagingConn) Send(m *messaging.Message) {
// Ignore any old messages.
c.mu.Lock()
if m.Index <= c.index {
c.mu.Unlock()
return
}
c.index = m.Index
c.mu.Unlock()
// Send message to channel.
c.c <- m
}
// tempfile returns a temporary path.
func tempfile() string {
f, _ := ioutil.TempFile("", "influxdb-")

View File

@ -100,7 +100,7 @@ func (s *Shard) open(path string, conn MessagingConn) error {
}
// Open connection.
if err := conn.Open(s.index); err != nil {
if err := conn.Open(s.index, true); err != nil {
_ = s.close()
return fmt.Errorf("open shard conn: id=%d, idx=%d, err=%s", s.ID, s.index, err)
}

View File

@ -7,12 +7,15 @@ import (
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/test"
)
// Ensure a transaction can retrieve a list of iterators for a simple SELECT statement.
func TestTx_CreateIterators(t *testing.T) {
t.Skip()
s := OpenDefaultServer(NewMessagingClient())
c := test.NewMessagingClient()
defer c.Close()
s := OpenDefaultServer(c)
defer s.Close()
// Write to us-east