Merge pull request #2840 from influxdb/parse_cluster_consistency

Add cluster consistency parsing to cluster package
pull/2857/head
Philip O'Toole 2015-06-09 14:22:45 -07:00
commit 22e6cbc960
7 changed files with 60 additions and 42 deletions

View File

@ -4,6 +4,7 @@ import (
"errors" "errors"
"log" "log"
"os" "os"
"strings"
"sync" "sync"
"time" "time"
@ -41,8 +42,27 @@ var (
// ErrWriteFailed is returned when no writes succeeded. // ErrWriteFailed is returned when no writes succeeded.
ErrWriteFailed = errors.New("write failed") ErrWriteFailed = errors.New("write failed")
// ErrInvalidConsistencyLevel is returned when parsing the string version
// of a consistency level.
ErrInvalidConsistencyLevel = errors.New("invalid consistency level")
) )
func ParseConsistencyLevel(level string) (ConsistencyLevel, error) {
switch strings.ToLower(level) {
case "any":
return ConsistencyLevelAny, nil
case "one":
return ConsistencyLevelOne, nil
case "quorum":
return ConsistencyLevelQuorum, nil
case "all":
return ConsistencyLevelAll, nil
default:
return 0, ErrInvalidConsistencyLevel
}
}
// PointsWriter handles writes across multiple local and remote data nodes. // PointsWriter handles writes across multiple local and remote data nodes.
type PointsWriter struct { type PointsWriter struct {
mu sync.RWMutex mu sync.RWMutex

View File

@ -84,7 +84,10 @@ func (cmd *Command) Run(args ...string) error {
} }
// Create server from config and start it. // Create server from config and start it.
s := NewServer(config) s, err := NewServer(config)
if err != nil {
return fmt.Errorf("create server: %s", err)
}
if err := s.Open(); err != nil { if err := s.Open(); err != nil {
return fmt.Errorf("open server: %s", err) return fmt.Errorf("open server: %s", err)
} }

View File

@ -42,7 +42,7 @@ type Server struct {
} }
// NewServer returns a new instance of Server built from a config. // NewServer returns a new instance of Server built from a config.
func NewServer(c *Config) *Server { func NewServer(c *Config) (*Server, error) {
// Construct base meta store and data store. // Construct base meta store and data store.
s := &Server{ s := &Server{
err: make(chan error), err: make(chan error),
@ -84,10 +84,12 @@ func NewServer(c *Config) *Server {
s.appendUDPService(c.UDP) s.appendUDPService(c.UDP)
s.appendRetentionPolicyService(c.Retention) s.appendRetentionPolicyService(c.Retention)
for _, g := range c.Graphites { for _, g := range c.Graphites {
s.appendGraphiteService(g) if err := s.appendGraphiteService(g); err != nil {
return nil, err
}
} }
return s return s, nil
} }
func (s *Server) appendClusterService(c cluster.Config) { func (s *Server) appendClusterService(c cluster.Config) {
@ -152,13 +154,18 @@ func (s *Server) appendOpenTSDBService(c opentsdb.Config) {
s.Services = append(s.Services, srv) s.Services = append(s.Services, srv)
} }
func (s *Server) appendGraphiteService(c graphite.Config) { func (s *Server) appendGraphiteService(c graphite.Config) error {
if !c.Enabled { if !c.Enabled {
return return nil
} }
srv := graphite.NewService(c) srv, err := graphite.NewService(c)
if err != nil {
return err
}
srv.PointsWriter = s.PointsWriter srv.PointsWriter = s.PointsWriter
s.Services = append(s.Services, srv) s.Services = append(s.Services, srv)
return nil
} }
func (s *Server) appendUDPService(c udp.Config) { func (s *Server) appendUDPService(c udp.Config) {

View File

@ -30,9 +30,9 @@ type Server struct {
// NewServer returns a new instance of Server. // NewServer returns a new instance of Server.
func NewServer(c *run.Config) *Server { func NewServer(c *run.Config) *Server {
srv, _ := run.NewServer(c)
s := Server{ s := Server{
Server: run.NewServer(c), Server: srv,
Config: c, Config: c,
} }
// Set the logger to discard unless verbose is on // Set the logger to discard unless verbose is on

View File

@ -3,7 +3,6 @@ package graphite
import ( import (
"strings" "strings"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/toml" "github.com/influxdb/influxdb/toml"
) )
@ -56,19 +55,3 @@ func NewConfig() Config {
func (c *Config) LastEnabled() bool { func (c *Config) LastEnabled() bool {
return c.NamePosition == strings.ToLower("last") return c.NamePosition == strings.ToLower("last")
} }
// ConsistencyAsEnum returns the enumerated write consistency level.
func (c *Config) ConsistencyAsEnum() cluster.ConsistencyLevel {
switch strings.ToLower(c.ConsistencyLevel) {
case "any":
return cluster.ConsistencyLevelAny
case "one":
return cluster.ConsistencyLevelOne
case "quorum":
return cluster.ConsistencyLevelQuorum
case "all":
return cluster.ConsistencyLevelAll
default:
return cluster.ConsistencyLevelOne
}
}

View File

@ -44,24 +44,29 @@ type Service struct {
} }
// NewService returns an instance of the Graphite service. // NewService returns an instance of the Graphite service.
func NewService(c Config) *Service { func NewService(c Config) (*Service, error) {
s := Service{ s := Service{
bindAddress: c.BindAddress, bindAddress: c.BindAddress,
database: c.Database, database: c.Database,
protocol: c.Protocol, protocol: c.Protocol,
batchSize: c.BatchSize, batchSize: c.BatchSize,
batchTimeout: time.Duration(c.BatchTimeout), batchTimeout: time.Duration(c.BatchTimeout),
consistencyLevel: c.ConsistencyAsEnum(), logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags),
logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags), done: make(chan struct{}),
done: make(chan struct{}),
} }
consistencyLevel, err := cluster.ParseConsistencyLevel(c.ConsistencyLevel)
if err != nil {
return nil, err
}
s.consistencyLevel = consistencyLevel
parser := NewParser() parser := NewParser()
parser.Separator = c.NameSeparator parser.Separator = c.NameSeparator
parser.LastEnabled = c.LastEnabled() parser.LastEnabled = c.LastEnabled()
s.parser = parser s.parser = parser
return &s return &s, nil
} }
// Open starts the Graphite input processing data. // Open starts the Graphite input processing data.

View File

@ -241,9 +241,9 @@ func Test_ServerGraphiteTCP(t *testing.T) {
config.BatchTimeout = toml.Duration(time.Second) config.BatchTimeout = toml.Duration(time.Second)
config.BindAddress = ":0" config.BindAddress = ":0"
service := graphite.NewService(config) service, err := graphite.NewService(config)
if service == nil { if err != nil {
t.Fatal("failed to create Graphite service") t.Fatalf("failed to create Graphite service: %s", err.Error())
} }
// Allow test to wait until points are written. // Allow test to wait until points are written.
@ -307,9 +307,9 @@ func Test_ServerGraphiteUDP(t *testing.T) {
config.BindAddress = ":10000" config.BindAddress = ":10000"
config.Protocol = "udp" config.Protocol = "udp"
service := graphite.NewService(config) service, err := graphite.NewService(config)
if service == nil { if err != nil {
t.Fatal("failed to create Graphite service") t.Fatalf("failed to create Graphite service: %s", err.Error())
} }
// Allow test to wait until points are written. // Allow test to wait until points are written.