Merge pull request #1272 from influxdb/nodes

Add node management
pull/1274/head
Ben Johnson 2014-12-30 10:44:24 -07:00
commit 29361bac41
7 changed files with 454 additions and 26 deletions

View File

@ -3,6 +3,8 @@ package influxdb
import (
"encoding/json"
"net/http"
"net/url"
"strconv"
"strings"
"github.com/bmizerany/pat"
@ -58,13 +60,14 @@ func NewHandler(s *Server) *Handler {
h.mux.Put("/db/:db/retention_policies/:name", http.HandlerFunc(h.serveUpdateRetentionPolicy))
h.mux.Del("/db/:db/retention_policies/:name", http.HandlerFunc(h.serveDeleteRetentionPolicy))
// Data node routes.
h.mux.Get("/data_nodes", http.HandlerFunc(h.serveDataNodes))
h.mux.Post("/data_nodes", http.HandlerFunc(h.serveCreateDataNode))
h.mux.Del("/data_nodes/:id", http.HandlerFunc(h.serveDeleteDataNode))
// Utilities
h.mux.Get("/ping", http.HandlerFunc(h.servePing))
// Cluster config endpoints
h.mux.Get("/cluster/servers", http.HandlerFunc(h.serveServers))
h.mux.Del("/cluster/servers/:id", http.HandlerFunc(h.serveDeleteServer))
return h
}
@ -413,11 +416,78 @@ func (h *Handler) serveDeleteRetentionPolicy(w http.ResponseWriter, r *http.Requ
w.WriteHeader(http.StatusNoContent)
}
// serveServers returns a list of servers in the cluster.
func (h *Handler) serveServers(w http.ResponseWriter, r *http.Request) {}
// serveDataNodes returns a list of all data nodes in the cluster.
func (h *Handler) serveDataNodes(w http.ResponseWriter, r *http.Request) {
// Generate a list of objects for encoding to the API.
a := make([]*dataNodeJSON, 0)
for _, n := range h.server.DataNodes() {
a = append(a, &dataNodeJSON{
ID: n.ID,
URL: n.URL.String(),
})
}
// serveDeleteServer removes a server from the cluster.
func (h *Handler) serveDeleteServer(w http.ResponseWriter, r *http.Request) {}
w.Header().Add("content-type", "application/json")
_ = json.NewEncoder(w).Encode(a)
}
// serveCreateDataNode creates a new data node in the cluster.
func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request) {
// Read in data node from request body.
var n dataNodeJSON
if err := json.NewDecoder(r.Body).Decode(&n); err != nil {
h.error(w, err.Error(), http.StatusBadRequest)
return
}
// Parse the URL.
u, err := url.Parse(n.URL)
if err != nil {
h.error(w, "invalid data node url", http.StatusBadRequest)
return
}
// Create the data node.
if err := h.server.CreateDataNode(u); err == ErrDataNodeExists {
h.error(w, err.Error(), http.StatusConflict)
return
} else if err != nil {
h.error(w, err.Error(), http.StatusInternalServerError)
return
}
// Write new node back to client.
node := h.server.DataNodeByURL(u)
w.WriteHeader(http.StatusCreated)
w.Header().Add("content-type", "application/json")
_ = json.NewEncoder(w).Encode(&dataNodeJSON{ID: node.ID, URL: node.URL.String()})
}
// serveDeleteDataNode removes an existing node.
func (h *Handler) serveDeleteDataNode(w http.ResponseWriter, r *http.Request) {
// Parse node id.
nodeID, err := strconv.ParseUint(r.URL.Query().Get(":id"), 10, 64)
if err != nil {
h.error(w, "invalid node id", http.StatusBadRequest)
return
}
// Delete the node.
if err := h.server.DeleteDataNode(nodeID); err == ErrDataNodeNotFound {
h.error(w, err.Error(), http.StatusNotFound)
return
} else if err != nil {
h.error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
type dataNodeJSON struct {
ID uint64 `json:"id"`
URL string `json:"url"`
}
// error returns an error to the client in a standard format.
func (h *Handler) error(w http.ResponseWriter, error string, code int) {

View File

@ -5,10 +5,11 @@ import (
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"
//"fmt"
"github.com/influxdb/influxdb"
)
@ -516,6 +517,88 @@ func TestHandler_DeleteUser_UserNotFound(t *testing.T) {
}
}
func TestHandler_DataNodes(t *testing.T) {
srvr := OpenServer(NewMessagingClient())
srvr.CreateDataNode(MustParseURL("http://localhost:1000"))
srvr.CreateDataNode(MustParseURL("http://localhost:2000"))
srvr.CreateDataNode(MustParseURL("http://localhost:3000"))
s := NewHTTPServer(srvr)
defer s.Close()
status, body := MustHTTP("GET", s.URL+`/data_nodes`, "")
if status != http.StatusOK {
t.Fatalf("unexpected status: %d", status)
} else if body != `[{"id":1,"url":"http://localhost:1000"},{"id":2,"url":"http://localhost:2000"},{"id":3,"url":"http://localhost:3000"}]` {
t.Fatalf("unexpected body: %s", body)
}
}
func TestHandler_CreateDataNode(t *testing.T) {
srvr := OpenServer(NewMessagingClient())
s := NewHTTPServer(srvr)
defer s.Close()
status, body := MustHTTP("POST", s.URL+`/data_nodes`, `{"url":"http://localhost:1000"}`)
if status != http.StatusCreated {
t.Fatalf("unexpected status: %d", status)
} else if body != `{"id":1,"url":"http://localhost:1000"}` {
t.Fatalf("unexpected body: %s", body)
}
}
func TestHandler_CreateDataNode_BadRequest(t *testing.T) {
srvr := OpenServer(NewMessagingClient())
s := NewHTTPServer(srvr)
defer s.Close()
status, body := MustHTTP("POST", s.URL+`/data_nodes`, `{"name":`)
if status != http.StatusBadRequest {
t.Fatalf("unexpected status: %d", status)
} else if body != `unexpected EOF` {
t.Fatalf("unexpected body: %s", body)
}
}
func TestHandler_CreateDataNode_InternalServerError(t *testing.T) {
srvr := OpenServer(NewMessagingClient())
s := NewHTTPServer(srvr)
defer s.Close()
status, body := MustHTTP("POST", s.URL+`/data_nodes`, `{"url":""}`)
if status != http.StatusInternalServerError {
t.Fatalf("unexpected status: %d", status, body)
} else if body != `data node url required` {
t.Fatalf("unexpected body: %s", body)
}
}
func TestHandler_DeleteDataNode(t *testing.T) {
srvr := OpenServer(NewMessagingClient())
srvr.CreateDataNode(MustParseURL("http://localhost:1000"))
s := NewHTTPServer(srvr)
defer s.Close()
status, body := MustHTTP("DELETE", s.URL+`/data_nodes/1`, "")
if status != http.StatusNoContent {
t.Fatalf("unexpected status: %d", status)
} else if body != `` {
t.Fatalf("unexpected body: %s", body)
}
}
func TestHandler_DeleteUser_DataNodeNotFound(t *testing.T) {
srvr := OpenServer(NewMessagingClient())
s := NewHTTPServer(srvr)
defer s.Close()
status, body := MustHTTP("DELETE", s.URL+`/data_nodes/10000`, "")
if status != http.StatusNotFound {
t.Fatalf("unexpected status: %d", status)
} else if body != `data node not found` {
t.Fatalf("unexpected body: %s", body)
}
}
func MustHTTP(verb, url, body string) (int, string) {
req, err := http.NewRequest(verb, url, bytes.NewBuffer([]byte(body)))
if err != nil {
@ -534,6 +617,15 @@ func MustHTTP(verb, url, body string) (int, string) {
return resp.StatusCode, strings.TrimRight(string(b), "\n")
}
// MustParseURL parses a string into a URL. Panic on error.
func MustParseURL(s string) *url.URL {
u, err := url.Parse(s)
if err != nil {
panic(err.Error())
}
return u
}
// Server is a test HTTP server that wraps a handler
type HTTPServer struct {
*httptest.Server

View File

@ -17,6 +17,18 @@ var (
// ErrPathRequired is returned when opening a server without a path.
ErrPathRequired = errors.New("path required")
// ErrDataNodeURLRequired is returned when creating a data node without a URL.
ErrDataNodeURLRequired = errors.New("data node url required")
// ErrDataNodeExists is returned when creating a duplicate data node.
ErrDataNodeExists = errors.New("data node exists")
// ErrDataNodeNotFound is returned when dropping a non-existent data node.
ErrDataNodeNotFound = errors.New("data node not found")
// ErrDataNodeRequired is returned when using a blank data node id.
ErrDataNodeRequired = errors.New("data node required")
// ErrDatabaseNameRequired is returned when creating a database without a name.
ErrDatabaseNameRequired = errors.New("database name required")

View File

@ -1,6 +1,7 @@
package influxdb
import (
"encoding/binary"
"sort"
"strings"
"time"
@ -42,6 +43,8 @@ func (m *metastore) close() error {
// init initializes the metastore to ensure all top-level buckets are created.
func (m *metastore) init() error {
return m.db.Update(func(tx *bolt.Tx) error {
_, _ = tx.CreateBucketIfNotExists([]byte("Server"))
_, _ = tx.CreateBucketIfNotExists([]byte("DataNodes"))
_, _ = tx.CreateBucketIfNotExists([]byte("Databases"))
_, _ = tx.CreateBucketIfNotExists([]byte("Users"))
return nil
@ -87,14 +90,46 @@ type metatx struct {
*bolt.Tx
}
// database returns a database from the metastore by name.
func (tx *metatx) database(name string) (db *database) {
if b := tx.Bucket([]byte("Databases")).Bucket([]byte(name)); b != nil {
mustUnmarshalJSON(b.Get([]byte("meta")), &db)
// id returns the server id.
func (tx *metatx) id() (id uint64) {
if v := tx.Bucket([]byte("Server")).Get([]byte("id")); v != nil {
id = btou64(v)
}
return
}
// setID sets the server id.
func (tx *metatx) setID(v uint64) error {
return tx.Bucket([]byte("Server")).Put([]byte("id"), u64tob(v))
}
// dataNodes returns a list of all data nodes from the metastore.
func (tx *metatx) dataNodes() (a []*DataNode) {
c := tx.Bucket([]byte("DataNodes")).Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
n := newDataNode()
mustUnmarshalJSON(v, &n)
a = append(a, n)
}
return
}
// nextDataNodeID returns a autoincrementing id.
func (tx *metatx) nextDataNodeID() uint64 {
id, _ := tx.Bucket([]byte("DataNodes")).NextSequence()
return id
}
// saveDataNode persists a data node to the metastore.
func (tx *metatx) saveDataNode(n *DataNode) error {
return tx.Bucket([]byte("DataNodes")).Put(u64tob(n.ID), mustMarshalJSON(n))
}
// deleteDataNode removes data node from the metastore.
func (tx *metatx) deleteDataNode(id uint64) error {
return tx.Bucket([]byte("DataNodes")).Delete(u64tob(id))
}
// databases returns a list of all databases from the metastore.
func (tx *metatx) databases() (a []*database) {
c := tx.Bucket([]byte("Databases")).Cursor()
@ -251,3 +286,13 @@ func (tx *metatx) saveUser(u *User) error {
func (tx *metatx) deleteUser(name string) error {
return tx.Bucket([]byte("Users")).Delete([]byte(name))
}
// u64tob converts a uint64 into an 8-byte slice.
func u64tob(v uint64) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, v)
return b
}
// btou64 converts an 8-byte slice into an int64.
func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) }

171
server.go
View File

@ -3,6 +3,7 @@ package influxdb
import (
"encoding/json"
"fmt"
"net/url"
"os"
"path/filepath"
"regexp"
@ -37,30 +38,39 @@ const (
)
const (
// broadcast messages
createDatabaseMessageType = messaging.MessageType(0x00)
deleteDatabaseMessageType = messaging.MessageType(0x01)
// Data node messages
createDataNodeMessageType = messaging.MessageType(0x00)
deleteDataNodeMessageType = messaging.MessageType(0x01)
createRetentionPolicyMessageType = messaging.MessageType(0x10)
updateRetentionPolicyMessageType = messaging.MessageType(0x11)
deleteRetentionPolicyMessageType = messaging.MessageType(0x12)
setDefaultRetentionPolicyMessageType = messaging.MessageType(0x13)
// Database messages
createDatabaseMessageType = messaging.MessageType(0x10)
deleteDatabaseMessageType = messaging.MessageType(0x11)
createUserMessageType = messaging.MessageType(0x20)
updateUserMessageType = messaging.MessageType(0x21)
deleteUserMessageType = messaging.MessageType(0x22)
// Retention policy messages
createRetentionPolicyMessageType = messaging.MessageType(0x20)
updateRetentionPolicyMessageType = messaging.MessageType(0x21)
deleteRetentionPolicyMessageType = messaging.MessageType(0x22)
setDefaultRetentionPolicyMessageType = messaging.MessageType(0x23)
createShardIfNotExistsMessageType = messaging.MessageType(0x30)
// User messages
createUserMessageType = messaging.MessageType(0x30)
updateUserMessageType = messaging.MessageType(0x31)
deleteUserMessageType = messaging.MessageType(0x32)
createSeriesIfNotExistsMessageType = messaging.MessageType(0x40)
// Shard messages
createShardIfNotExistsMessageType = messaging.MessageType(0x40)
// per-topic messages
// Series messages
createSeriesIfNotExistsMessageType = messaging.MessageType(0x50)
// Write raw data messages (per-topic)
writeSeriesMessageType = messaging.MessageType(0x80)
)
// Server represents a collection of metadata and raw metric data.
type Server struct {
mu sync.RWMutex
id uint64
path string
done chan struct{} // goroutine close notification
@ -70,6 +80,8 @@ type Server struct {
meta *metastore // metadata store
dataNodes map[uint64]*DataNode // data nodes by id
databases map[string]*database // databases by name
databasesByShard map[uint64]*database // databases by shard id
users map[string]*User // user by name
@ -82,6 +94,7 @@ func NewServer(client MessagingClient) *Server {
return &Server{
client: client,
meta: &metastore{},
dataNodes: make(map[uint64]*DataNode),
databases: make(map[string]*database),
databasesByShard: make(map[uint64]*database),
users: make(map[string]*User),
@ -166,6 +179,9 @@ func (s *Server) Close() error {
// load reads the state of the server from the metastore.
func (s *Server) load() error {
return s.meta.view(func(tx *metatx) error {
// Read server id.
s.id = tx.id()
// Load databases.
s.databases = make(map[string]*database)
for _, db := range tx.databases() {
@ -234,6 +250,114 @@ func (s *Server) sync(index uint64) error {
}
}
// DataNode returns a data node by id.
func (s *Server) DataNode(id uint64) *DataNode {
s.mu.RLock()
defer s.mu.RUnlock()
return s.dataNodes[id]
}
// DataNodeByURL returns a data node by url.
func (s *Server) DataNodeByURL(u *url.URL) *DataNode {
s.mu.RLock()
defer s.mu.RUnlock()
for _, n := range s.dataNodes {
if n.URL.String() == u.String() {
return n
}
}
return nil
}
// DataNodes returns a list of data nodes.
func (s *Server) DataNodes() (a []*DataNode) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, n := range s.dataNodes {
a = append(a, n)
}
sort.Sort(dataNodes(a))
return
}
// CreateDataNode creates a new data node with a given URL.
func (s *Server) CreateDataNode(u *url.URL) error {
c := &createDataNodeCommand{URL: u.String()}
_, err := s.broadcast(createDataNodeMessageType, c)
return err
}
func (s *Server) applyCreateDataNode(m *messaging.Message) (err error) {
var c createDataNodeCommand
mustUnmarshalJSON(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
// Validate parameters.
if c.URL == "" {
return ErrDataNodeURLRequired
}
// Check that another node with the same URL doesn't already exist.
u, _ := url.Parse(c.URL)
for _, n := range s.dataNodes {
if n.URL.String() == u.String() {
return ErrDataNodeExists
}
}
// Create data node.
n := newDataNode()
n.URL = u
// Persist to metastore.
err = s.meta.mustUpdate(func(tx *metatx) error {
n.ID = tx.nextDataNodeID()
return tx.saveDataNode(n)
})
// Add to node on server.
s.dataNodes[n.ID] = n
return
}
type createDataNodeCommand struct {
URL string `json:"url"`
}
// DeleteDataNode deletes an existing data node.
func (s *Server) DeleteDataNode(id uint64) error {
c := &deleteDataNodeCommand{ID: id}
_, err := s.broadcast(deleteDataNodeMessageType, c)
return err
}
func (s *Server) applyDeleteDataNode(m *messaging.Message) (err error) {
var c deleteDataNodeCommand
mustUnmarshalJSON(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
n := s.dataNodes[c.ID]
if n == nil {
return ErrDataNodeNotFound
}
// Remove from metastore.
err = s.meta.mustUpdate(func(tx *metatx) error { return tx.deleteDataNode(c.ID) })
// Delete the node.
delete(s.dataNodes, n.ID)
return
}
type deleteDataNodeCommand struct {
ID uint64 `json:"id"`
}
// DatabaseExists returns true if a database exists.
func (s *Server) DatabaseExists(name string) bool {
s.mu.RLock()
@ -431,6 +555,8 @@ func (s *Server) applyCreateShardIfNotExists(m *messaging.Message) (err error) {
db.shards[sh.ID] = sh
rp.Shards = append(rp.Shards, sh)
// TODO: Subscribe to shard if it matches the server's index.
return
}
@ -949,6 +1075,10 @@ func (s *Server) processor(done chan struct{}) {
switch m.Type {
case writeSeriesMessageType:
err = s.applyWriteSeries(m)
case createDataNodeMessageType:
err = s.applyCreateDataNode(m)
case deleteDataNodeMessageType:
err = s.applyDeleteDataNode(m)
case createDatabaseMessageType:
err = s.applyCreateDatabase(m)
case deleteDatabaseMessageType:
@ -992,6 +1122,21 @@ type MessagingClient interface {
C() <-chan *messaging.Message
}
// DataNode represents a data node in the cluster.
type DataNode struct {
ID uint64
URL *url.URL
}
// newDataNode returns an instance of DataNode.
func newDataNode() *DataNode { return &DataNode{} }
type dataNodes []*DataNode
func (p dataNodes) Len() int { return len(p) }
func (p dataNodes) Less(i, j int) bool { return p[i].ID < p[j].ID }
func (p dataNodes) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// database represents a collection of retention policies.
type database struct {
name string

View File

@ -3,6 +3,7 @@ package influxdb_test
import (
"fmt"
"io/ioutil"
"net/url"
"os"
"reflect"
"testing"
@ -32,6 +33,66 @@ func TestServer_Open_ErrServerOpen(t *testing.T) { t.Skip("pending") }
// Ensure an error is returned when opening a server without a path.
func TestServer_Open_ErrPathRequired(t *testing.T) { t.Skip("pending") }
// Ensure the server can create a new data node.
func TestServer_CreateDataNode(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
// Create a new node.
u, _ := url.Parse("http://localhost:80000")
if err := s.CreateDataNode(u); err != nil {
t.Fatal(err)
}
s.Restart()
// Verify that the node exists.
if n := s.DataNodeByURL(u); n == nil {
t.Fatalf("data node not found")
} else if n.URL.String() != "http://localhost:80000" {
t.Fatalf("unexpected url: %s", n.URL)
} else if n.ID == 0 {
t.Fatalf("unexpected id: %d", n.ID)
}
}
// Ensure the server returns an error when creating a duplicate node.
func TestServer_CreateDatabase_ErrDataNodeExists(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
// Create a node with the same URL twice.
u, _ := url.Parse("http://localhost:80000")
if err := s.CreateDataNode(u); err != nil {
t.Fatal(err)
}
if err := s.CreateDataNode(u); err != influxdb.ErrDataNodeExists {
t.Fatal(err)
}
}
// Ensure the server can delete a node.
func TestServer_DeleteDataNode(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
// Create a data node and verify it exists.
u, _ := url.Parse("http://localhost:80000")
if err := s.CreateDataNode(u); err != nil {
t.Fatal(err)
} else if s.DataNodeByURL(u) == nil {
t.Fatalf("data node not actually created")
}
s.Restart()
// Drop the node and verify that it's gone.
n := s.DataNodeByURL(u)
if err := s.DeleteDataNode(n.ID); err != nil {
t.Fatal(err)
} else if s.DataNode(n.ID) != nil {
t.Fatalf("data node not actually dropped")
}
}
// Ensure the server can create a database.
func TestServer_CreateDatabase(t *testing.T) {
s := OpenServer(NewMessagingClient())

View File

@ -16,6 +16,9 @@ type Shard struct {
StartTime time.Time `json:"startTime,omitempty"`
EndTime time.Time `json:"endTime,omitempty"`
replicaN []uint64 // replication factor
dataNodeIDs []uint64 // owner nodes
store *bolt.DB
}