Refactor messaging error handling.

pull/903/head
Ben Johnson 2014-10-24 17:45:02 -06:00
parent a3c7549ee9
commit 4b276bada3
4 changed files with 117 additions and 42 deletions

View File

@ -58,16 +58,6 @@ func (c *DropDatabaseCommand) Apply(server raft.Server) (interface{}, error) {
return nil, err
}
type CreateDatabaseCommand struct {
Name string `json:"name"`
}
func (c *CreateDatabaseCommand) Apply(server raft.Server) (interface{}, error) {
config := server.Context().(*cluster.ClusterConfiguration)
err := config.CreateDatabase(c.Name)
return nil, err
}
type SaveDbUserCommand struct {
User *cluster.DbUser `json:"user"`
}

View File

@ -18,6 +18,9 @@ var (
// ErrDatabaseExists is returned when creating a database with the same
// name as an existing database.
ErrDatabaseExists = errors.New("database exists")
// ErrDatabaseNotFound is returned when dropping a non-existent database.
ErrDatabaseNotFound = errors.New("database not found")
)
const (

View File

@ -13,14 +13,23 @@ import (
"github.com/influxdb/influxdb/protocol"
)
const (
createDatabaseMessageType = messaging.MessageType(0x00)
deleteDatabaseMessageType = messaging.MessageType(0x01)
createDBUserMessageType = messaging.MessageType(0x11)
deleteDBUserMessageType = messaging.MessageType(0x12)
)
// Server represents a collection of metadata and raw metric data.
type Server struct {
mu sync.RWMutex
path string
done chan struct{} // goroutine close notification
index uint64 // highest broadcast index seen
client MessagingClient
client MessagingClient // broker client
index uint64 // highest broadcast index seen
errors map[uint64]error // message errors
databases map[string]*Database
}
@ -32,6 +41,7 @@ func NewServer(client MessagingClient) *Server {
return &Server{
client: client,
databases: make(map[string]*Database),
errors: make(map[uint64]error),
}
}
@ -102,25 +112,34 @@ func (s *Server) broadcast(typ messaging.MessageType, c interface{}) (uint64, er
}
// Wait for the server to receive the message.
s.sync(index)
err = s.sync(index)
return index, nil
return index, err
}
// sync blocks until a given index (or a higher index) has been seen.
func (s *Server) sync(index uint64) {
// Returns any error associated with the command.
func (s *Server) sync(index uint64) error {
for {
if s.Index() >= index {
return
// Check if index has occurred. If so, retrieve the error and return.
s.mu.RLock()
if s.index >= index {
err, ok := s.errors[index]
if ok {
delete(s.errors, index)
}
s.mu.RUnlock()
return err
}
s.mu.RUnlock()
// Otherwise wait momentarily and check again.
time.Sleep(1 * time.Millisecond)
}
}
// Index returns the highest broadcast index received by the server.
func (s *Server) Index() uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.index
}
@ -133,29 +152,52 @@ func (s *Server) Database(name string) *Database {
// CreateDatabase creates a new database.
func (s *Server) CreateDatabase(name string) error {
s.mu.Lock()
if s.databases[name] != nil {
s.mu.Unlock()
return ErrDatabaseExists
}
s.mu.Unlock()
_, err := s.broadcast(createDatabaseMessageType, &createDatabaseCommand{Name: name})
return err
}
func (s *Server) applyCreateDatabase(m *messaging.Message) {
func (s *Server) applyCreateDatabase(m *messaging.Message) error {
var c createDatabaseCommand
mustUnmarshal(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
if s.databases[c.Name] != nil {
return
return ErrDatabaseExists
}
// Create database entry.
s.databases[c.Name] = &Database{name: c.Name}
return nil
}
type createDatabaseCommand struct {
Name string `json:"name"`
}
// DeleteDatabase deletes an existing database.
func (s *Server) DeleteDatabase(name string) error {
_, err := s.broadcast(deleteDatabaseMessageType, &deleteDatabaseCommand{Name: name})
return err
}
func (s *Server) applyDeleteDatabase(m *messaging.Message) error {
var c deleteDatabaseCommand
mustUnmarshal(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
if s.databases[c.Name] == nil {
return ErrDatabaseNotFound
}
// Delete the database entry.
delete(s.databases, c.Name)
return nil
}
type deleteDatabaseCommand struct {
Name string `json:"name"`
}
// WriteSeries writes series data to the broker.
@ -177,30 +219,26 @@ func (s *Server) processor(done chan struct{}) {
}
// Process message.
var err error
switch m.Type {
case createDatabaseMessageType:
s.applyCreateDatabase(m)
err = s.applyCreateDatabase(m)
case deleteDatabaseMessageType:
err = s.applyDeleteDatabase(m)
}
// Sync high water mark for broadcast topic.
// Sync high water mark and errors for broadcast topic.
if m.TopicID == messaging.BroadcastTopicID {
s.mu.Lock()
s.index = m.Index
if err != nil {
s.errors[m.Index] = err
}
s.mu.Unlock()
}
}
}
const (
createDatabaseMessageType = messaging.MessageType(0x00)
deleteDatabaseMessageType = messaging.MessageType(0x01)
)
// createDatabaseCommand creates a new database.
type createDatabaseCommand struct {
Name string `json:"name"`
}
// MessagingClient represents the client used to receive messages from brokers.
type MessagingClient interface {
// Publishes a message to the broker.

View File

@ -25,8 +25,7 @@ func TestServer_Open(t *testing.T) {
// Ensure the server can create a database.
func TestServer_CreateDatabase(t *testing.T) {
c := NewMessagingClient()
s := OpenServer(c)
s := OpenServer(NewMessagingClient())
defer s.Close()
// Create the "foo" database.
@ -42,6 +41,51 @@ func TestServer_CreateDatabase(t *testing.T) {
}
}
// Ensure the server returns an error when creating a duplicate database.
func TestServer_CreateDatabase_ErrDatabaseExists(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
// Create the "foo" database twice.
if err := s.CreateDatabase("foo"); err != nil {
t.Fatal(err)
}
if err := s.CreateDatabase("foo"); err != influxdb.ErrDatabaseExists {
t.Fatal(err)
}
}
// Ensure the server can drop a database.
func TestServer_DropDatabase(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
// Create the "foo" database and verify it exists.
if err := s.CreateDatabase("foo"); err != nil {
t.Fatal(err)
} else if d := s.Database("foo"); d == nil {
t.Fatalf("database not actually created")
}
// Drop the "foo" database and verify that it's gone.
if err := s.DeleteDatabase("foo"); err != nil {
t.Fatal(err)
} else if d := s.Database("foo"); d != nil {
t.Fatalf("database not actually dropped")
}
}
// Ensure the server returns an error when dropping a database that doesn't exist.
func TestServer_DropDatabase_ErrDatabaseNotFound(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
// Drop a database that doesn't exist.
if err := s.DeleteDatabase("no_such_db"); err != influxdb.ErrDatabaseNotFound {
t.Fatal(err)
}
}
// Server is a wrapping test struct for influxdb.Server.
type Server struct {
*influxdb.Server