influxdb/server.go

538 lines
13 KiB
Go
Raw Normal View History

2014-10-22 05:32:19 +00:00
package influxdb
import (
2014-10-24 00:54:12 +00:00
"encoding/json"
2014-10-22 05:32:19 +00:00
"fmt"
2014-10-24 05:38:03 +00:00
"os"
2014-10-27 23:31:45 +00:00
"sort"
2014-10-22 05:32:19 +00:00
"sync"
"time"
"github.com/influxdb/influxdb/messaging"
2014-10-23 04:21:48 +00:00
"github.com/influxdb/influxdb/protocol"
2014-10-22 05:32:19 +00:00
)
2014-10-30 00:21:17 +00:00
const (
// DefaultRootPassword is the password initially set for the root user.
// It is also used when reseting the root user's password.
DefaultRootPassword = "root"
// DefaultShardSpaceName is the name of a databases's default shard space.
DefaultShardSpaceName = "default"
// DefaultSplitN represents the number of partitions a shard is split into.
DefaultSplitN = 1
// DefaultReplicaN represents the number of replicas data is written to.
DefaultReplicaN = 1
// DefaultShardDuration is the time period held by a shard.
DefaultShardDuration = 7 * (24 * time.Hour)
// DefaultShardRetention is the length of time before a shard is dropped.
DefaultShardRetention = time.Duration(0)
)
2014-10-24 23:45:02 +00:00
const (
2014-10-29 02:30:21 +00:00
createDatabaseMessageType = messaging.MessageType(0x00)
deleteDatabaseMessageType = messaging.MessageType(0x01)
createShardSpaceMessageType = messaging.MessageType(0x02)
deleteShardSpaceMessageType = messaging.MessageType(0x03)
createClusterAdminMessageType = messaging.MessageType(0x04)
deleteClusterAdminMessageType = messaging.MessageType(0x05)
2014-10-29 02:30:21 +00:00
clusterAdminSetPasswordMessageType = messaging.MessageType(0x06)
createDBUserMessageType = messaging.MessageType(0x07)
deleteDBUserMessageType = messaging.MessageType(0x08)
dbUserSetPasswordMessageType = messaging.MessageType(0x09)
2014-10-24 23:45:02 +00:00
)
2014-10-22 05:32:19 +00:00
// Server represents a collection of metadata and raw metric data.
type Server struct {
2014-10-24 05:38:03 +00:00
mu sync.RWMutex
path string
done chan struct{} // goroutine close notification
2014-10-24 23:45:02 +00:00
client MessagingClient // broker client
index uint64 // highest broadcast index seen
errors map[uint64]error // message errors
2014-10-24 00:54:12 +00:00
2014-10-24 05:38:03 +00:00
databases map[string]*Database
2014-10-29 02:30:21 +00:00
admins map[string]*ClusterAdmin
2014-10-22 05:32:19 +00:00
}
// NewServer returns a new instance of Server.
// The server requires a client to the messaging broker to be passed in.
func NewServer(client MessagingClient) *Server {
assert(client != nil, "messaging client required")
return &Server{
client: client,
2014-10-23 04:21:48 +00:00
databases: make(map[string]*Database),
2014-10-28 23:54:49 +00:00
admins: make(map[string]*ClusterAdmin),
2014-10-24 23:45:02 +00:00
errors: make(map[uint64]error),
2014-10-22 05:32:19 +00:00
}
}
2014-10-23 04:21:48 +00:00
// Path returns the path used when opening the server.
// Returns an empty string when the server is closed.
func (s *Server) Path() string { return s.path }
2014-10-22 05:32:19 +00:00
// Open initializes the server from a given path.
func (s *Server) Open(path string) error {
2014-10-24 00:54:12 +00:00
// Ensure the server isn't already open and there's a path provided.
2014-10-23 04:21:48 +00:00
if s.opened() {
return ErrServerOpen
} else if path == "" {
return ErrPathRequired
}
2014-10-24 05:38:03 +00:00
// Set the server path.
s.path = path
2014-10-24 00:54:12 +00:00
// Start goroutine to read messages from the broker.
s.done = make(chan struct{}, 0)
go s.processor(s.done)
2014-10-22 05:32:19 +00:00
return nil
}
2014-10-23 04:21:48 +00:00
// opened returns true when the server is open.
func (s *Server) opened() bool { return s.path != "" }
2014-10-22 05:32:19 +00:00
// Close shuts down the server.
func (s *Server) Close() error {
2014-10-24 00:54:12 +00:00
s.mu.Lock()
2014-10-24 05:38:03 +00:00
defer s.mu.Unlock()
2014-10-24 00:54:12 +00:00
if !s.opened() {
return ErrServerClosed
}
// Close notification.
close(s.done)
s.done = nil
// Remove path.
s.path = ""
2014-10-22 05:32:19 +00:00
return nil
}
2014-10-24 05:38:03 +00:00
// broadcast encodes a message as JSON and send it to the broker's broadcast topic.
// This function waits until the message has been processed by the server.
2014-10-24 00:54:12 +00:00
// Returns the broker log index of the message or an error.
2014-10-24 05:38:03 +00:00
func (s *Server) broadcast(typ messaging.MessageType, c interface{}) (uint64, error) {
2014-10-24 00:54:12 +00:00
// Encode the command.
2014-10-24 05:38:03 +00:00
data, err := json.Marshal(c)
2014-10-24 00:54:12 +00:00
if err != nil {
return 0, err
}
// Publish the message.
2014-10-24 05:38:03 +00:00
m := &messaging.Message{
Type: typ,
TopicID: messaging.BroadcastTopicID,
Data: data,
}
index, err := s.client.Publish(m)
2014-10-24 00:54:12 +00:00
if err != nil {
2014-10-24 05:38:03 +00:00
return 0, err
2014-10-24 00:54:12 +00:00
}
2014-10-24 05:38:03 +00:00
// Wait for the server to receive the message.
2014-10-24 23:45:02 +00:00
err = s.sync(index)
2014-10-24 05:38:03 +00:00
2014-10-24 23:45:02 +00:00
return index, err
2014-10-24 05:38:03 +00:00
}
// sync blocks until a given index (or a higher index) has been seen.
2014-10-24 23:45:02 +00:00
// Returns any error associated with the command.
func (s *Server) sync(index uint64) error {
2014-10-24 05:38:03 +00:00
for {
2014-10-24 23:45:02 +00:00
// Check if index has occurred. If so, retrieve the error and return.
s.mu.RLock()
if s.index >= index {
err, ok := s.errors[index]
if ok {
delete(s.errors, index)
}
s.mu.RUnlock()
return err
2014-10-24 05:38:03 +00:00
}
2014-10-24 23:45:02 +00:00
s.mu.RUnlock()
// Otherwise wait momentarily and check again.
2014-10-24 05:38:03 +00:00
time.Sleep(1 * time.Millisecond)
2014-10-24 00:54:12 +00:00
}
2014-10-24 05:38:03 +00:00
}
2014-10-24 00:54:12 +00:00
2014-10-28 00:16:03 +00:00
// Database creates a new database.
2014-10-24 05:38:03 +00:00
func (s *Server) Database(name string) *Database {
s.mu.Lock()
defer s.mu.Unlock()
return s.databases[name]
2014-10-24 00:54:12 +00:00
}
2014-10-28 00:16:03 +00:00
// Databases returns a list of all databases, sorted by name.
func (s *Server) Databases() []*Database {
s.mu.Lock()
defer s.mu.Unlock()
var a databases
for _, db := range s.databases {
a = append(a, db)
}
sort.Sort(a)
return a
}
2014-10-24 00:54:12 +00:00
// CreateDatabase creates a new database.
func (s *Server) CreateDatabase(name string) error {
2014-10-25 17:54:23 +00:00
c := &createDatabaseCommand{Name: name}
_, err := s.broadcast(createDatabaseMessageType, c)
2014-10-24 05:38:03 +00:00
return err
2014-10-24 00:54:12 +00:00
}
2014-10-24 23:45:02 +00:00
func (s *Server) applyCreateDatabase(m *messaging.Message) error {
2014-10-24 05:38:03 +00:00
var c createDatabaseCommand
mustUnmarshal(m.Data, &c)
2014-10-24 00:54:12 +00:00
s.mu.Lock()
defer s.mu.Unlock()
if s.databases[c.Name] != nil {
2014-10-24 23:45:02 +00:00
return ErrDatabaseExists
2014-10-24 00:54:12 +00:00
}
// Create database entry.
2014-10-25 04:38:01 +00:00
db := newDatabase(s)
db.name = c.Name
s.databases[c.Name] = db
2014-10-24 23:45:02 +00:00
return nil
}
type createDatabaseCommand struct {
Name string `json:"name"`
}
// DeleteDatabase deletes an existing database.
func (s *Server) DeleteDatabase(name string) error {
2014-10-25 17:54:23 +00:00
c := &deleteDatabaseCommand{Name: name}
_, err := s.broadcast(deleteDatabaseMessageType, c)
2014-10-24 23:45:02 +00:00
return err
}
func (s *Server) applyDeleteDatabase(m *messaging.Message) error {
var c deleteDatabaseCommand
mustUnmarshal(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
if s.databases[c.Name] == nil {
return ErrDatabaseNotFound
}
// Delete the database entry.
delete(s.databases, c.Name)
return nil
}
type deleteDatabaseCommand struct {
Name string `json:"name"`
2014-10-24 00:54:12 +00:00
}
2014-10-28 23:54:49 +00:00
// ClusterAdmin returns an admin by name.
// Returns nil if the admin does not exist.
func (s *Server) ClusterAdmin(name string) *ClusterAdmin {
s.mu.Lock()
defer s.mu.Unlock()
return s.admins[name]
}
// ClusterAdmins returns a list of all cluster admins, sorted by name.
func (s *Server) ClusterAdmins() []*ClusterAdmin {
s.mu.Lock()
defer s.mu.Unlock()
var a clusterAdmins
for _, u := range s.admins {
a = append(a, u)
}
sort.Sort(a)
return a
}
2014-10-28 23:54:49 +00:00
// CreateClusterAdmin creates a cluster admin on the server.
func (s *Server) CreateClusterAdmin(username, password string) error {
2014-10-28 23:54:49 +00:00
c := &createClusterAdminCommand{Username: username, Password: password}
_, err := s.broadcast(createClusterAdminMessageType, c)
return err
}
func (s *Server) applyCreateClusterAdmin(m *messaging.Message) error {
var c createClusterAdminCommand
mustUnmarshal(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
// Validate admin.
if c.Username == "" {
return ErrUsernameRequired
} else if s.admins[c.Username] != nil {
return ErrClusterAdminExists
}
// Generate the hash of the password.
hash, err := HashPassword(c.Password)
if err != nil {
return err
}
// Create the cluster admin.
s.admins[c.Username] = &ClusterAdmin{
CommonUser: CommonUser{
Name: c.Username,
Hash: string(hash),
},
}
return nil
}
type createClusterAdminCommand struct {
Username string `json:"username"`
Password string `json:"password"`
}
2014-10-29 00:43:03 +00:00
// DeleteClusterAdmin removes a cluster admin from the server.
func (s *Server) DeleteClusterAdmin(username string) error {
c := &deleteClusterAdminCommand{Username: username}
_, err := s.broadcast(deleteClusterAdminMessageType, c)
return err
}
func (s *Server) applyDeleteClusterAdmin(m *messaging.Message) error {
var c deleteClusterAdminCommand
mustUnmarshal(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
// Validate admin.
if c.Username == "" {
return ErrUsernameRequired
} else if s.admins[c.Username] == nil {
return ErrClusterAdminNotFound
}
// Delete the cluster admin.
delete(s.admins, c.Username)
return nil
}
type deleteClusterAdminCommand struct {
Username string `json:"username"`
}
2014-10-28 23:54:49 +00:00
func (s *Server) applyDBUserSetPassword(m *messaging.Message) error {
var c dbUserSetPasswordCommand
mustUnmarshal(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
// Retrieve the database.
db := s.databases[c.Database]
if s.databases[c.Database] == nil {
return ErrDatabaseNotFound
}
return db.changePassword(c.Username, c.Password)
}
2014-10-25 04:38:01 +00:00
func (s *Server) applyCreateDBUser(m *messaging.Message) error {
var c createDBUserCommand
mustUnmarshal(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
// Retrieve the database.
db := s.databases[c.Database]
if s.databases[c.Database] == nil {
return ErrDatabaseNotFound
}
2014-10-29 02:30:21 +00:00
return db.applyCreateUser(c.Username, c.Password, c.Permissions)
2014-10-25 04:38:01 +00:00
}
type createDBUserCommand struct {
Database string `json:"database"`
Username string `json:"username"`
Password string `json:"password"`
Permissions []string `json:"permissions"`
}
2014-10-25 17:54:23 +00:00
func (s *Server) applyDeleteDBUser(m *messaging.Message) error {
var c deleteDBUserCommand
mustUnmarshal(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
// Retrieve the database.
db := s.databases[c.Database]
if s.databases[c.Database] == nil {
return ErrDatabaseNotFound
}
2014-10-29 02:30:21 +00:00
return db.applyDeleteUser(c.Username)
2014-10-25 17:54:23 +00:00
}
type deleteDBUserCommand struct {
Database string `json:"database"`
Username string `json:"username"`
}
2014-10-29 02:30:21 +00:00
type dbUserSetPasswordCommand struct {
2014-10-25 19:30:41 +00:00
Database string `json:"database"`
Username string `json:"username"`
Password string `json:"password"`
}
func (s *Server) applyCreateShardSpace(m *messaging.Message) error {
var c createShardSpaceCommand
mustUnmarshal(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
// Retrieve the database.
db := s.databases[c.Database]
if s.databases[c.Database] == nil {
return ErrDatabaseNotFound
}
return db.applyCreateShardSpace(c.Name, c.Regex, c.Retention, c.Duration, c.ReplicaN, c.SplitN)
}
type createShardSpaceCommand struct {
Database string `json:"database"`
Name string `json:"name"`
Regex string `json:"regex"`
Retention time.Duration `json:"retention"`
Duration time.Duration `json:"duration"`
ReplicaN uint32 `json:"replicaN"`
SplitN uint32 `json:"splitN"`
}
func (s *Server) applyDeleteShardSpace(m *messaging.Message) error {
var c deleteShardSpaceCommand
mustUnmarshal(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
// Retrieve the database.
db := s.databases[c.Database]
if s.databases[c.Database] == nil {
return ErrDatabaseNotFound
}
return db.applyDeleteShardSpace(c.Name)
}
type deleteShardSpaceCommand struct {
Database string `json:"database"`
Name string `json:"name"`
}
2014-10-23 04:21:48 +00:00
// WriteSeries writes series data to the broker.
func (s *Server) WriteSeries(u *ClusterAdmin, database string, series *protocol.Series) error {
// TODO:
return nil
}
2014-10-24 00:54:12 +00:00
// processor runs in a separate goroutine and processes all incoming broker messages.
func (s *Server) processor(done chan struct{}) {
client := s.client
for {
// Read incoming message.
var m *messaging.Message
select {
case <-done:
return
case m = <-client.C():
}
// Process message.
2014-10-24 23:45:02 +00:00
var err error
2014-10-24 00:54:12 +00:00
switch m.Type {
case createDatabaseMessageType:
2014-10-24 23:45:02 +00:00
err = s.applyCreateDatabase(m)
case deleteDatabaseMessageType:
err = s.applyDeleteDatabase(m)
2014-10-28 23:54:49 +00:00
case createClusterAdminMessageType:
err = s.applyCreateClusterAdmin(m)
2014-10-29 00:43:03 +00:00
case deleteClusterAdminMessageType:
err = s.applyDeleteClusterAdmin(m)
2014-10-25 04:38:01 +00:00
case createDBUserMessageType:
err = s.applyCreateDBUser(m)
2014-10-25 17:54:23 +00:00
case deleteDBUserMessageType:
err = s.applyDeleteDBUser(m)
2014-10-29 02:30:21 +00:00
case dbUserSetPasswordMessageType:
err = s.applyDBUserSetPassword(m)
case createShardSpaceMessageType:
err = s.applyCreateShardSpace(m)
case deleteShardSpaceMessageType:
err = s.applyDeleteShardSpace(m)
2014-10-24 05:38:03 +00:00
}
2014-10-24 23:45:02 +00:00
// Sync high water mark and errors for broadcast topic.
2014-10-24 05:38:03 +00:00
if m.TopicID == messaging.BroadcastTopicID {
s.mu.Lock()
s.index = m.Index
2014-10-24 23:45:02 +00:00
if err != nil {
s.errors[m.Index] = err
}
2014-10-24 05:38:03 +00:00
s.mu.Unlock()
2014-10-24 00:54:12 +00:00
}
}
}
2014-10-22 05:32:19 +00:00
// MessagingClient represents the client used to receive messages from brokers.
type MessagingClient interface {
2014-10-24 05:38:03 +00:00
// Publishes a message to the broker.
Publish(m *messaging.Message) (index uint64, err error)
// The streaming channel for all subscribed messages.
2014-10-22 05:32:19 +00:00
C() <-chan *messaging.Message
}
2014-10-24 00:54:12 +00:00
// mustMarshal encodes a value to JSON.
// This will panic if an error occurs. This should only be used internally when
// an invalid marshal will cause corruption and a panic is appropriate.
func mustMarshal(v interface{}) []byte {
b, err := json.Marshal(v)
if err != nil {
panic("marshal: " + err.Error())
}
return b
}
// mustUnmarshal decodes a value from JSON.
// This will panic if an error occurs. This should only be used internally when
// an invalid unmarshal will cause corruption and a panic is appropriate.
func mustUnmarshal(b []byte, v interface{}) {
if err := json.Unmarshal(b, v); err != nil {
panic("unmarshal: " + err.Error())
}
}
2014-10-22 05:32:19 +00:00
// assert will panic with a given formatted message if the given condition is false.
func assert(condition bool, msg string, v ...interface{}) {
if !condition {
panic(fmt.Sprintf("assert failed: "+msg, v...))
}
}
2014-10-24 05:38:03 +00:00
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }