merge conflict

pull/1276/head
Cory LaNou 2014-12-31 15:47:10 -06:00
commit 2303a0f014
11 changed files with 345 additions and 197 deletions

View File

@ -4,6 +4,8 @@ import (
"fmt"
"net"
"os"
"os/user"
"path/filepath"
"strconv"
"strings"
"time"
@ -22,9 +24,14 @@ const (
// can be queried concurrently at one time.
DefaultConcurrentShardQueryLimit = 10
// DefaultAPIReadTimeout represents the amount time before an API request
// times out.
// DefaultAPIReadTimeout represents the duration before an API request times out.
DefaultAPIReadTimeout = 5 * time.Second
// DefaultBrokerPort represents the default port the broker runs on.
DefaultBrokerPort = 8086
// DefaultHTTPAPIPort represents the default port the HTTP API runs on.
DefaultHTTPAPIPort = 8086
)
// Config represents the configuration format for the influxd binary.
@ -61,13 +68,13 @@ type Config struct {
} `toml:"udp_servers"`
} `toml:"input_plugins"`
Raft struct {
Broker struct {
Port int `toml:"port"`
Dir string `toml:"dir"`
Timeout Duration `toml:"election-timeout"`
} `toml:"raft"`
} `toml:"broker"`
Storage struct {
Data struct {
Dir string `toml:"dir"`
WriteBufferSize int `toml:"write-buffer-size"`
MaxOpenShards int `toml:"max-open-shards"`
@ -75,7 +82,7 @@ type Config struct {
WriteBatchSize int `toml:"write-batch-size"`
Engines map[string]toml.Primitive `toml:"engines"`
RetentionSweepPeriod Duration `toml:"retention-sweep-period"`
} `toml:"storage"`
} `toml:"data"`
Cluster struct {
Dir string `toml:"dir"`
@ -97,15 +104,21 @@ type Config struct {
// NewConfig returns an instance of Config with reasonable defaults.
func NewConfig() *Config {
u, _ := user.Current()
c := &Config{}
c.Storage.RetentionSweepPeriod = Duration(10 * time.Minute)
c.Data.RetentionSweepPeriod = Duration(10 * time.Minute)
c.Cluster.ConcurrentShardQueryLimit = DefaultConcurrentShardQueryLimit
c.Raft.Timeout = Duration(1 * time.Second)
c.Broker.Dir = filepath.Join(u.HomeDir, ".influxdb/broker")
c.Broker.Port = DefaultBrokerPort
c.Broker.Timeout = Duration(1 * time.Second)
c.HTTPAPI.Port = DefaultHTTPAPIPort
c.HTTPAPI.ReadTimeout = Duration(DefaultAPIReadTimeout)
c.Cluster.MinBackoff = Duration(1 * time.Second)
c.Cluster.MaxBackoff = Duration(10 * time.Second)
c.Cluster.ProtobufHeartbeatInterval = Duration(10 * time.Millisecond)
c.Storage.WriteBufferSize = 1000
c.Data.Dir = filepath.Join(u.HomeDir, ".influxdb/data")
c.Data.WriteBufferSize = 1000
c.Cluster.WriteBufferSize = 1000
c.Cluster.MaxResponseBufferSize = 100
@ -124,29 +137,29 @@ func NewConfig() *Config {
return c
}
// PointBatchSize returns the storage point batch size, if set.
// PointBatchSize returns the data point batch size, if set.
// If not set, the LevelDB point batch size is returned.
// If that is not set then the default point batch size is returned.
func (c *Config) PointBatchSize() int {
if c.Storage.PointBatchSize != 0 {
return c.Storage.PointBatchSize
if c.Data.PointBatchSize != 0 {
return c.Data.PointBatchSize
}
return DefaultPointBatchSize
}
// WriteBatchSize returns the storage write batch size, if set.
// WriteBatchSize returns the data write batch size, if set.
// If not set, the LevelDB write batch size is returned.
// If that is not set then the default write batch size is returned.
func (c *Config) WriteBatchSize() int {
if c.Storage.WriteBatchSize != 0 {
return c.Storage.WriteBatchSize
if c.Data.WriteBatchSize != 0 {
return c.Data.WriteBatchSize
}
return DefaultWriteBatchSize
}
// MaxOpenShards returns the maximum number of shards to keep open at once.
func (c *Config) MaxOpenShards() int {
return c.Storage.MaxOpenShards
return c.Data.MaxOpenShards
}
// ApiHTTPListenAddr returns the binding address the API HTTP server
@ -154,14 +167,14 @@ func (c *Config) ApiHTTPListenAddr() string {
return fmt.Sprintf("%s:%d", c.BindAddress, c.HTTPAPI.Port)
}
// RaftListenAddr returns the binding address the Raft server
func (c *Config) RaftListenAddr() string {
return fmt.Sprintf("%s:%d", c.BindAddress, c.Raft.Port)
// BrokerListenAddr returns the binding address the Broker server
func (c *Config) BrokerListenAddr() string {
return fmt.Sprintf("%s:%d", c.BindAddress, c.Broker.Port)
}
// RaftConnectionString returns the address required to contact the Raft server
func (c *Config) RaftConnectionString() string {
return fmt.Sprintf("http://%s:%d", c.Hostname, c.Raft.Port)
// BrokerConnectionString returns the address required to contact the Broker server
func (c *Config) BrokerConnectionString() string {
return fmt.Sprintf("http://%s:%d", c.Hostname, c.Broker.Port)
}
// Size represents a TOML parseable file size.

View File

@ -54,7 +54,7 @@ func TestParseConfig(t *testing.T) {
t.Fatalf("admin assets mismatch: %v", c.Admin.Assets)
}
if c.HTTPAPI.Port != 0 {
if c.HTTPAPI.Port != main.DefaultBrokerPort {
t.Fatalf("http api port mismatch: %v", c.HTTPAPI.Port)
} else if c.HTTPAPI.SSLPort != 8087 {
t.Fatalf("http api ssl port mismatch: %v", c.HTTPAPI.SSLPort)
@ -94,16 +94,16 @@ func TestParseConfig(t *testing.T) {
t.Fatalf("graphite udp protocol mismatch: expected %v, got %v", "udp", strings.ToLower(udpGraphite.Protocol))
}
if c.Raft.Port != 8090 {
t.Fatalf("raft port mismatch: %v", c.Raft.Port)
} else if c.Raft.Dir != "/tmp/influxdb/development/raft" {
t.Fatalf("raft dir mismatch: %v", c.Raft.Dir)
} else if time.Duration(c.Raft.Timeout) != time.Second {
t.Fatalf("raft duration mismatch: %v", c.Raft.Timeout)
if c.Broker.Port != 8090 {
t.Fatalf("broker port mismatch: %v", c.Broker.Port)
} else if c.Broker.Dir != "/tmp/influxdb/development/broker" {
t.Fatalf("broker dir mismatch: %v", c.Broker.Dir)
} else if time.Duration(c.Broker.Timeout) != time.Second {
t.Fatalf("broker duration mismatch: %v", c.Broker.Timeout)
}
if c.Storage.Dir != "/tmp/influxdb/development/db" {
t.Fatalf("data dir mismatch: %v", c.Storage.Dir)
if c.Data.Dir != "/tmp/influxdb/development/db" {
t.Fatalf("data dir mismatch: %v", c.Data.Dir)
}
if c.Cluster.ProtobufPort != 8099 {
@ -184,17 +184,21 @@ database = "graphite_udp" # store graphite data in this database
# Raft configuration
[raft]
# The raft port should be open between all servers in a cluster.
# Broker configuration
[broker]
# The broker port should be open between all servers in a cluster.
# However, this port shouldn't be accessible from the internet.
port = 8090
# Where the raft logs are stored. The user running InfluxDB will need read/write access.
dir = "/tmp/influxdb/development/raft"
# Where the broker logs are stored. The user running InfluxDB will need read/write access.
dir = "/tmp/influxdb/development/broker"
# election-timeout = "2s"
[storage]
[data]
dir = "/tmp/influxdb/development/db"
# How many requests to potentially buffer in memory. If the buffer gets filled then writes
# will still be logged and once the local storage has caught up (or compacted) the writes
# will be replayed from the WAL
@ -211,7 +215,7 @@ retention-sweep-period = "10m"
# prior to shutting down. Any server can be pointed to
# as a seed. It will find the Raft leader automatically.
# Here's an example. Note that the port on the host is the same as the raft port.
# Here's an example. Note that the port on the host is the same as the broker port.
seed-servers = ["hosta:8090", "hostb:8090"]
# Replication happens over a TCP connection with a Protobuf protocol.

View File

@ -34,7 +34,7 @@ func execCreateCluster(args []string) {
// Create the broker.
b := messaging.NewBroker()
if err := b.Open(config.Raft.Dir, config.RaftConnectionString()); err != nil {
if err := b.Open(config.Broker.Dir, config.BrokerConnectionString()); err != nil {
log.Fatalf("broker: %s", err.Error())
}
@ -49,22 +49,22 @@ func execCreateCluster(args []string) {
log.Fatalf("create-cluster: data directory already exists")
}
// Now create the storage directory.
if err := os.MkdirAll(config.Storage.Dir, 0744); err != nil {
log.Fatalf("create-cluster storage: %s", err.Error())
// Now create the data directory.
if err := os.MkdirAll(config.Data.Dir, 0744); err != nil {
log.Fatalf("create-cluster data dir: %s", err.Error())
}
// Configure the Messaging Client such that this node connects to itself.
var seedUrls []*url.URL
u, err := url.Parse(config.RaftListenAddr())
u, err := url.Parse(config.BrokerListenAddr())
if err != nil {
log.Fatalf("create-cluster seed URLs: %s", err.Error())
}
seedUrls = append(seedUrls, u)
c := messaging.NewClient(0) // TODO: Set replica id.
if err := c.Open(filepath.Join(config.Storage.Dir, messagingClientFile), seedUrls); err != nil {
if err := c.Open(filepath.Join(config.Data.Dir, messagingClientFile), seedUrls); err != nil {
log.Fatalf("create-cluster open client: %s", err.Error())
}
if err := c.Close(); err != nil {
@ -73,7 +73,7 @@ func execCreateCluster(args []string) {
}
log.Println("new cluster node created as", *role, "in", config.Raft.Dir)
log.Println("new cluster node created as", *role, "in", config.Broker.Dir)
}
func printCreateClusterUsage() {

View File

@ -3,32 +3,40 @@ package main
import (
"net/http"
"strings"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/messaging"
)
// Handler represents an HTTP handler for InfluxDB node. Depending on its role, it
// will serve many different endpoints.
// Handler represents an HTTP handler for InfluxDB node.
// Depending on its role, it will serve many different endpoints.
type Handler struct {
brokerHandler *messaging.Handler
serverHandler *influxdb.Handler
brokerHandler http.Handler
serverHandler http.Handler
}
// NewHandler returns a new instance of Handler.
func NewHandler(b *messaging.Handler, s *influxdb.Handler) *Handler {
func NewHandler(bh, sh http.Handler) *Handler {
return &Handler{
brokerHandler: b,
serverHandler: s,
brokerHandler: bh,
serverHandler: sh,
}
}
// ServeHTTP responds to HTTP request to the handler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Route raft and messaging paths to the broker.
if strings.HasPrefix(r.URL.Path, "/raft") || strings.HasPrefix(r.URL.Path, "/messages") {
if h.brokerHandler == nil {
http.NotFound(w, r)
return
}
h.brokerHandler.ServeHTTP(w, r)
return
}
// Route all other paths to the server.
if h.serverHandler == nil {
http.NotFound(w, r)
return
}
h.serverHandler.ServeHTTP(w, r)
}

View File

@ -50,7 +50,7 @@ func execJoinCluster(args []string) {
// Broker required -- but don't initialize it.
// Joining a cluster will do that.
b := messaging.NewBroker()
if err := b.Open(config.Raft.Dir, config.RaftConnectionString()); err != nil {
if err := b.Open(config.Broker.Dir, config.BrokerConnectionString()); err != nil {
log.Fatalf("join: %s", err)
}
@ -74,17 +74,17 @@ func execJoinCluster(args []string) {
// If joining as a data node then create a data directory.
if *role == "combined" || *role == "data" {
if _, err := os.Stat(config.Storage.Dir); err == nil {
log.Fatalf("join-cluster: storage directory already exists")
if _, err := os.Stat(config.Data.Dir); err == nil {
log.Fatalf("join-cluster: data directory already exists")
}
if err := os.MkdirAll(config.Storage.Dir, 0744); err != nil {
log.Fatalf("join-cluster storage: %s", err.Error())
if err := os.MkdirAll(config.Data.Dir, 0744); err != nil {
log.Fatalf("join-cluster data dir: %s", err.Error())
}
// Configure the Messaging Client.
c := messaging.NewClient(0) // TODO: Set replica id.
if err := c.Open(filepath.Join(config.Storage.Dir, messagingClientFile), seedURLs); err != nil {
if err := c.Open(filepath.Join(config.Data.Dir, messagingClientFile), seedURLs); err != nil {
log.Fatalf("join-cluster open client: %s", err.Error())
}
if err := c.Close(); err != nil {

View File

@ -56,8 +56,6 @@ func main() {
// Extract name from args.
switch cmd {
case "create-cluster":
execCreateCluster(args[1:])
case "join-cluster":
execJoinCluster(args[1:])
case "run":
@ -103,7 +101,6 @@ Usage:
The commands are:
create-cluster create a new node that other nodes can join to form a new cluster
join-cluster create a new node that will join an existing cluster
run run node with existing configuration
version displays the InfluxDB version

View File

@ -12,7 +12,6 @@ import (
"strings"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/graphite"
"github.com/influxdb/influxdb/messaging"
)
@ -23,144 +22,184 @@ func execRun(args []string) {
var (
configPath = fs.String("config", configDefaultPath, "")
pidPath = fs.String("pidfile", "", "")
role = fs.String("role", "combined", "")
hostname = fs.String("hostname", "", "")
seedServers = fs.String("seed-servers", "", "")
)
fs.Usage = printRunUsage
fs.Parse(args)
// Write pid file.
if *pidPath != "" {
pid := strconv.Itoa(os.Getpid())
if err := ioutil.WriteFile(*pidPath, []byte(pid), 0644); err != nil {
log.Fatal(err)
}
// Validate CLI flags.
if *role != "combined" && *role != "broker" && *role != "data" {
log.Fatalf("role must be 'combined', 'broker', or 'data'")
}
// Parse configuration.
config, err := ParseConfigFile(*configPath)
if err != nil {
log.Fatalf("config: %s", err)
}
// Override config properties.
if *hostname != "" {
config.Hostname = *hostname
}
// TODO(benbjohnson): Start admin server.
// Parse broker urls from seed servers.
brokerURLs := parseSeedServers(*seedServers)
// Print sweet InfluxDB logo and write the process id to file.
log.Print(logo)
if config.BindAddress == "" {
log.Printf("Starting Influx Server %s...", version)
} else {
log.Printf("Starting Influx Server %s bound to %s...", version, config.BindAddress)
}
writePIDFile(*pidPath)
// Start up the node.
var brokerHandler *messaging.Handler
var serverHandler *influxdb.Handler
var brokerDirExists bool
var storageDirExists bool
// Parse the configuration and determine if a broker and/or server exist.
config := parseConfig(*configPath, *hostname)
hasBroker := fileExists(config.Broker.Dir)
hasServer := fileExists(config.Data.Dir)
initializing := !hasBroker && !hasServer
if _, err := os.Stat(config.Raft.Dir); err == nil {
brokerDirExists = true
}
if _, err := os.Stat(config.Storage.Dir); err == nil {
storageDirExists = true
}
// Open broker if it exists or if we're initializing for the first time.
var b *messaging.Broker
var h *Handler
if hasBroker || (initializing && (*role == "combined" || *role == "broker")) {
b = openBroker(config.Broker.Dir, config.BrokerConnectionString())
if !brokerDirExists && !storageDirExists {
// Node is completely new, so create the minimum needed, which
// is a storage directory.
if err := os.MkdirAll(config.Storage.Dir, 0744); err != nil {
log.Fatal(err)
}
storageDirExists = true
}
// If the Broker directory exists, open a Broker on this node.
if brokerDirExists {
b := messaging.NewBroker()
if err := b.Open(config.Raft.Dir, config.RaftConnectionString()); err != nil {
log.Fatalf("failed to open Broker: %v", err.Error())
}
brokerHandler = messaging.NewHandler(b)
}
// If the storage directory exists, open a Data node.
if storageDirExists {
var client influxdb.MessagingClient
var server *influxdb.Server
clientFilePath := filepath.Join(config.Storage.Dir, messagingClientFile)
if _, err := os.Stat(clientFilePath); err == nil {
var brokerURLs []*url.URL
for _, s := range strings.Split(*seedServers, ",") {
u, err := url.Parse(s)
if err != nil {
log.Fatalf("seed server %v", err)
}
brokerURLs = append(brokerURLs, u)
// If this is the first time running then initialize a broker.
// Update the seed server so the server can connect locally.
if initializing {
if err := b.Initialize(); err != nil {
log.Fatalf("initialize: %s", err)
}
}
c := messaging.NewClient(0) // TODO: Set replica id.
if err := c.Open(clientFilePath, brokerURLs); err != nil {
log.Fatalf("Error opening Messaging Client: %s", err.Error())
}
defer c.Close()
client = c
// Start the broker handler.
h = &Handler{brokerHandler: messaging.NewHandler(b)}
go func() { log.Fatal(http.ListenAndServe(config.BrokerListenAddr(), h)) }()
log.Printf("Broker running on %s", config.BrokerListenAddr())
}
// Open server if it exists or we're initializing for the first time.
var s *influxdb.Server
if hasServer || (initializing && (*role == "combined" || *role == "data")) {
s = openServer(config.Data.Dir)
// If the server is uninitialized then initialize it with the broker.
// Otherwise simply create a messaging client with the server id.
if s.ID() == 0 {
initServer(s, b)
} else {
client = messaging.NewLoopbackClient()
openServerClient(s, brokerURLs)
}
server = influxdb.NewServer(client)
err = server.Open(config.Storage.Dir)
if err != nil {
log.Fatalf("failed to open data Server %v", err.Error())
// Start the server handler.
// If it uses the same port as the broker then simply attach it.
sh := influxdb.NewHandler(s)
if config.BrokerListenAddr() == config.ApiHTTPListenAddr() {
h.serverHandler = sh
} else {
go func() { log.Fatal(http.ListenAndServe(config.ApiHTTPListenAddr(), sh)) }()
}
serverHandler = influxdb.NewHandler(server)
// Spin up any grahite servers
for _, g := range config.Graphite {
// Get a new server
s := graphite.Server{Server: server}
// Set database
s.Database = g.Database
// Set the addresses up
s.TCPAddr = g.TCPAddr(config.BindAddress)
s.UDPAddr = g.UDPAddr(config.BindAddress)
// Spin it up
go func() { log.Fatal(s.ListenAndServe) }()
}
}
// TODO: startProfiler()
// TODO: -reset-root
// Start up HTTP server(s)
if config.ApiHTTPListenAddr() != config.RaftListenAddr() {
if serverHandler != nil {
go func() { log.Fatal(http.ListenAndServe(config.ApiHTTPListenAddr(), serverHandler)) }()
}
if brokerHandler != nil {
go func() { log.Fatal(http.ListenAndServe(config.RaftListenAddr(), brokerHandler)) }()
}
} else {
h := NewHandler(brokerHandler, serverHandler)
go func() { log.Fatal(http.ListenAndServe(config.ApiHTTPListenAddr(), h)) }()
log.Printf("DataNode#%d running on %s", s.ID(), config.ApiHTTPListenAddr())
}
// Wait indefinitely.
<-(chan struct{})(nil)
}
// write the current process id to a file specified by path.
func writePIDFile(path string) {
if path == "" {
return
}
// Retrieve the PID and write it.
pid := strconv.Itoa(os.Getpid())
if err := ioutil.WriteFile(path, []byte(pid), 0644); err != nil {
log.Fatal(err)
}
}
// parses the configuration from a given path. Sets overrides as needed.
func parseConfig(path, hostname string) *Config {
// Parse configuration.
config, err := ParseConfigFile(path)
if os.IsNotExist(err) {
config = NewConfig()
} else if err != nil {
log.Fatalf("config: %s", err)
}
// Override config properties.
if hostname != "" {
config.Hostname = hostname
}
return config
}
// creates and initializes a broker at a given path.
func openBroker(path, addr string) *messaging.Broker {
b := messaging.NewBroker()
if err := b.Open(path, addr); err != nil {
log.Fatalf("failed to open broker: %s", err)
}
return b
}
// creates and initializes a server at a given path.
func openServer(path string) *influxdb.Server {
s := influxdb.NewServer()
if err := s.Open(path); err != nil {
log.Fatalf("failed to open data server: %v", err.Error())
}
return s
}
// initializes a new server that does not yet have an ID.
func initServer(s *influxdb.Server, b *messaging.Broker) {
// TODO: Change messaging client to not require a ReplicaID so we can create
// a replica without already being a replica.
// Create replica on broker.
if err := b.CreateReplica(1); err != nil {
log.Fatalf("replica creation error: %s", err)
}
// Initialize messaging client.
c := messaging.NewClient(1)
if err := c.Open(filepath.Join(s.Path(), messagingClientFile), []*url.URL{b.URL()}); err != nil {
log.Fatalf("messaging client error: %s", err)
}
if err := s.SetClient(c); err != nil {
log.Fatalf("set client error: %s", err)
}
// Initialize the server.
if err := s.Initialize(b.URL()); err != nil {
log.Fatalf("server initialization error: %s", err)
}
}
// opens the messaging client and attaches it to the server.
func openServerClient(s *influxdb.Server, brokerURLs []*url.URL) {
c := messaging.NewClient(s.ID())
if err := c.Open(filepath.Join(s.Path(), messagingClientFile), brokerURLs); err != nil {
log.Fatalf("messaging client error: %s", err)
}
if err := s.SetClient(c); err != nil {
log.Fatalf("set client error: %s", err)
}
}
// parses a comma-delimited list of URLs.
func parseSeedServers(s string) (a []*url.URL) {
for _, s := range strings.Split(s, ",") {
u, err := url.Parse(s)
if err != nil {
log.Fatalf("cannot parse seed servers: %s", err)
}
a = append(a, u)
}
return
}
// returns true if the file exists.
func fileExists(path string) bool {
if _, err := os.Stat(path); os.IsNotExist(err) {
return false
}
return true
}
func printRunUsage() {
log.Printf(`usage: run [flags]
@ -171,6 +210,12 @@ use Distributed Consensus, but is otherwise fully-functional.
-config <path>
Set the path to the configuration file. Defaults to %s.
-role <role>
Set the role to be 'combined', 'broker' or 'data'. broker' means it will take
part in Raft Distributed Consensus. 'data' means it will store time-series data.
'combined' means it will do both. The default is 'combined'. In role other than
these three is invalid.
-hostname <name>
Override the hostname, the 'hostname' configuration option will be overridden.

View File

@ -67,9 +67,9 @@ func TestSelectStatement_Substatement(t *testing.T) {
// 5. 4 with different condition order
{
stmt: `SELECT sum(aa.value) + sum(bb.value) FROM join(aa, bb) WHERE (bb.host = "serverb" OR bb.host = "serverc") AND aa.host = "servera" AND 1 = 2`,
stmt: `SELECT sum(aa.value) + sum(bb.value) FROM join(aa, bb) WHERE ((bb.host = "serverb" OR bb.host = "serverc") AND aa.host = "servera") AND 1 = 2`,
expr: &influxql.VarRef{Val: "bb.value"},
sub: `SELECT bb.value FROM bb WHERE (bb.host = "serverb" OR bb.host = "serverc") AND 1.000 = 2.000`,
sub: `SELECT bb.value FROM bb WHERE ((bb.host = "serverb" OR bb.host = "serverc")) AND 1.000 = 2.000`,
},
}

View File

@ -101,6 +101,11 @@ func (b *Broker) Close() error {
return nil
}
// URL returns the connection url for the broker.
func (b *Broker) URL() *url.URL {
return b.log.URL
}
// Initialize creates a new cluster.
func (b *Broker) Initialize() error {
if err := b.log.Initialize(); err != nil {

View File

@ -88,11 +88,8 @@ type Server struct {
}
// NewServer returns a new instance of Server.
// The server requires a client to the messaging broker to be passed in.
func NewServer(client MessagingClient) *Server {
assert(client != nil, "messaging client required")
func NewServer() *Server {
return &Server{
client: client,
meta: &metastore{},
dataNodes: make(map[uint64]*DataNode),
databases: make(map[string]*database),
@ -102,9 +99,21 @@ func NewServer(client MessagingClient) *Server {
}
}
// ID returns the data node id for the server.
// Returns zero if the server is closed or the server has not joined a cluster.
func (s *Server) ID() uint64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.id
}
// Path returns the path used when opening the server.
// Returns an empty string when the server is closed.
func (s *Server) Path() string { return s.path }
func (s *Server) Path() string {
s.mu.Lock()
defer s.mu.Unlock()
return s.path
}
// shardPath returns the path for a shard.
func (s *Server) shardPath(id uint64) string {
@ -144,10 +153,6 @@ func (s *Server) Open(path string) error {
// Set the server path.
s.path = path
// Start goroutine to read messages from the broker.
s.done = make(chan struct{}, 0)
go s.processor(s.done)
return nil
}
@ -163,9 +168,8 @@ func (s *Server) Close() error {
return ErrServerClosed
}
// Close notification.
close(s.done)
s.done = nil
// Close message processing.
s.setClient(nil)
// Close metastore.
_ = s.meta.close()
@ -202,6 +206,44 @@ func (s *Server) load() error {
})
}
// Client retrieves the current messaging client.
func (s *Server) Client() MessagingClient {
s.mu.RLock()
defer s.mu.RUnlock()
return s.client
}
// SetClient sets the messaging client on the server.
func (s *Server) SetClient(client MessagingClient) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.setClient(client)
}
func (s *Server) setClient(client MessagingClient) error {
// Ensure the server is open.
if !s.opened() {
return ErrServerClosed
}
// Stop previous processor, if running.
if s.done != nil {
close(s.done)
s.done = nil
}
// Set the messaging client.
s.client = client
// Start goroutine to read messages from the broker.
if client != nil {
s.done = make(chan struct{}, 0)
go s.processor(client, s.done)
}
return nil
}
// broadcast encodes a message as JSON and send it to the broker's broadcast topic.
// This function waits until the message has been processed by the server.
// Returns the broker log index of the message or an error.
@ -250,6 +292,32 @@ func (s *Server) sync(index uint64) error {
}
}
// Initialize creates a new data node and initializes the server's id to 1.
func (s *Server) Initialize(u *url.URL) error {
// Create a new data node.
if err := s.CreateDataNode(u); err != nil {
return err
}
// Ensure the data node returns with an ID of 1.
// If it doesn't then something went really wrong. We have to panic because
// the messaging client relies on the first server being assigned ID 1.
n := s.DataNodeByURL(u)
assert(n != nil && n.ID == 1, "invalid initial server id: %d", n.ID)
// Set the ID on the metastore.
if err := s.meta.mustUpdate(func(tx *metatx) error {
return tx.setID(n.ID)
}); err != nil {
return err
}
// Set the ID on the server.
s.id = 1
return nil
}
// DataNode returns a data node by id.
func (s *Server) DataNode(id uint64) *DataNode {
s.mu.RLock()
@ -1059,8 +1127,7 @@ func (s *Server) Measurements(database string) (a Measurements) {
}
// processor runs in a separate goroutine and processes all incoming broker messages.
func (s *Server) processor(done chan struct{}) {
client := s.client
func (s *Server) processor(client MessagingClient, done chan struct{}) {
for {
// Read incoming message.
var m *messaging.Message

View File

@ -16,8 +16,7 @@ import (
// Ensure the server can be successfully opened and closed.
func TestServer_Open(t *testing.T) {
c := NewMessagingClient()
s := NewServer(c)
s := NewServer()
defer s.Close()
if err := s.Server.Open(tempfile()); err != nil {
t.Fatal(err)
@ -527,28 +526,38 @@ type Server struct {
}
// NewServer returns a new test server instance.
func NewServer(client influxdb.MessagingClient) *Server {
return &Server{influxdb.NewServer(client)}
func NewServer() *Server {
return &Server{influxdb.NewServer()}
}
// OpenServer returns a new, open test server instance.
func OpenServer(client influxdb.MessagingClient) *Server {
s := NewServer(client)
s := NewServer()
if err := s.Open(tempfile()); err != nil {
panic(err.Error())
}
if err := s.SetClient(client); err != nil {
panic(err.Error())
}
return s
}
// Restart stops and restarts the server.
func (s *Server) Restart() {
path := s.Path()
path, client := s.Path(), s.Client()
// Stop the server.
if err := s.Server.Close(); err != nil {
panic("close: " + err.Error())
}
// Open and reset the client.
if err := s.Server.Open(path); err != nil {
panic("open: " + err.Error())
}
if err := s.Server.SetClient(client); err != nil {
panic("client: " + err.Error())
}
}
// Close shuts down the server and removes all temporary files.