diff --git a/handler.go b/handler.go index c655321a3e..e18f454cb8 100644 --- a/handler.go +++ b/handler.go @@ -3,6 +3,8 @@ package influxdb import ( "encoding/json" "net/http" + "net/url" + "strconv" "strings" "github.com/bmizerany/pat" @@ -58,13 +60,14 @@ func NewHandler(s *Server) *Handler { h.mux.Put("/db/:db/retention_policies/:name", http.HandlerFunc(h.serveUpdateRetentionPolicy)) h.mux.Del("/db/:db/retention_policies/:name", http.HandlerFunc(h.serveDeleteRetentionPolicy)) + // Data node routes. + h.mux.Get("/data_nodes", http.HandlerFunc(h.serveDataNodes)) + h.mux.Post("/data_nodes", http.HandlerFunc(h.serveCreateDataNode)) + h.mux.Del("/data_nodes/:id", http.HandlerFunc(h.serveDeleteDataNode)) + // Utilities h.mux.Get("/ping", http.HandlerFunc(h.servePing)) - // Cluster config endpoints - h.mux.Get("/cluster/servers", http.HandlerFunc(h.serveServers)) - h.mux.Del("/cluster/servers/:id", http.HandlerFunc(h.serveDeleteServer)) - return h } @@ -413,11 +416,78 @@ func (h *Handler) serveDeleteRetentionPolicy(w http.ResponseWriter, r *http.Requ w.WriteHeader(http.StatusNoContent) } -// serveServers returns a list of servers in the cluster. -func (h *Handler) serveServers(w http.ResponseWriter, r *http.Request) {} +// serveDataNodes returns a list of all data nodes in the cluster. +func (h *Handler) serveDataNodes(w http.ResponseWriter, r *http.Request) { + // Generate a list of objects for encoding to the API. + a := make([]*dataNodeJSON, 0) + for _, n := range h.server.DataNodes() { + a = append(a, &dataNodeJSON{ + ID: n.ID, + URL: n.URL.String(), + }) + } -// serveDeleteServer removes a server from the cluster. -func (h *Handler) serveDeleteServer(w http.ResponseWriter, r *http.Request) {} + w.Header().Add("content-type", "application/json") + _ = json.NewEncoder(w).Encode(a) +} + +// serveCreateDataNode creates a new data node in the cluster. +func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request) { + // Read in data node from request body. + var n dataNodeJSON + if err := json.NewDecoder(r.Body).Decode(&n); err != nil { + h.error(w, err.Error(), http.StatusBadRequest) + return + } + + // Parse the URL. + u, err := url.Parse(n.URL) + if err != nil { + h.error(w, "invalid data node url", http.StatusBadRequest) + return + } + + // Create the data node. + if err := h.server.CreateDataNode(u); err == ErrDataNodeExists { + h.error(w, err.Error(), http.StatusConflict) + return + } else if err != nil { + h.error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Write new node back to client. + node := h.server.DataNodeByURL(u) + w.WriteHeader(http.StatusCreated) + w.Header().Add("content-type", "application/json") + _ = json.NewEncoder(w).Encode(&dataNodeJSON{ID: node.ID, URL: node.URL.String()}) +} + +// serveDeleteDataNode removes an existing node. +func (h *Handler) serveDeleteDataNode(w http.ResponseWriter, r *http.Request) { + // Parse node id. + nodeID, err := strconv.ParseUint(r.URL.Query().Get(":id"), 10, 64) + if err != nil { + h.error(w, "invalid node id", http.StatusBadRequest) + return + } + + // Delete the node. + if err := h.server.DeleteDataNode(nodeID); err == ErrDataNodeNotFound { + h.error(w, err.Error(), http.StatusNotFound) + return + } else if err != nil { + h.error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +type dataNodeJSON struct { + ID uint64 `json:"id"` + URL string `json:"url"` +} // error returns an error to the client in a standard format. func (h *Handler) error(w http.ResponseWriter, error string, code int) { diff --git a/handler_test.go b/handler_test.go index 92fbe535c6..3b9e00885d 100644 --- a/handler_test.go +++ b/handler_test.go @@ -5,10 +5,11 @@ import ( "io/ioutil" "net/http" "net/http/httptest" + "net/url" "strings" "testing" "time" - //"fmt" + "github.com/influxdb/influxdb" ) @@ -516,6 +517,88 @@ func TestHandler_DeleteUser_UserNotFound(t *testing.T) { } } +func TestHandler_DataNodes(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + srvr.CreateDataNode(MustParseURL("http://localhost:1000")) + srvr.CreateDataNode(MustParseURL("http://localhost:2000")) + srvr.CreateDataNode(MustParseURL("http://localhost:3000")) + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("GET", s.URL+`/data_nodes`, "") + if status != http.StatusOK { + t.Fatalf("unexpected status: %d", status) + } else if body != `[{"id":1,"url":"http://localhost:1000"},{"id":2,"url":"http://localhost:2000"},{"id":3,"url":"http://localhost:3000"}]` { + t.Fatalf("unexpected body: %s", body) + } +} + +func TestHandler_CreateDataNode(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("POST", s.URL+`/data_nodes`, `{"url":"http://localhost:1000"}`) + if status != http.StatusCreated { + t.Fatalf("unexpected status: %d", status) + } else if body != `{"id":1,"url":"http://localhost:1000"}` { + t.Fatalf("unexpected body: %s", body) + } +} + +func TestHandler_CreateDataNode_BadRequest(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("POST", s.URL+`/data_nodes`, `{"name":`) + if status != http.StatusBadRequest { + t.Fatalf("unexpected status: %d", status) + } else if body != `unexpected EOF` { + t.Fatalf("unexpected body: %s", body) + } +} + +func TestHandler_CreateDataNode_InternalServerError(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("POST", s.URL+`/data_nodes`, `{"url":""}`) + if status != http.StatusInternalServerError { + t.Fatalf("unexpected status: %d", status, body) + } else if body != `data node url required` { + t.Fatalf("unexpected body: %s", body) + } +} + +func TestHandler_DeleteDataNode(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + srvr.CreateDataNode(MustParseURL("http://localhost:1000")) + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("DELETE", s.URL+`/data_nodes/1`, "") + if status != http.StatusNoContent { + t.Fatalf("unexpected status: %d", status) + } else if body != `` { + t.Fatalf("unexpected body: %s", body) + } +} + +func TestHandler_DeleteUser_DataNodeNotFound(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("DELETE", s.URL+`/data_nodes/10000`, "") + if status != http.StatusNotFound { + t.Fatalf("unexpected status: %d", status) + } else if body != `data node not found` { + t.Fatalf("unexpected body: %s", body) + } +} + func MustHTTP(verb, url, body string) (int, string) { req, err := http.NewRequest(verb, url, bytes.NewBuffer([]byte(body))) if err != nil { @@ -534,6 +617,15 @@ func MustHTTP(verb, url, body string) (int, string) { return resp.StatusCode, strings.TrimRight(string(b), "\n") } +// MustParseURL parses a string into a URL. Panic on error. +func MustParseURL(s string) *url.URL { + u, err := url.Parse(s) + if err != nil { + panic(err.Error()) + } + return u +} + // Server is a test HTTP server that wraps a handler type HTTPServer struct { *httptest.Server diff --git a/influxdb.go b/influxdb.go index ec8dca2e0e..6aaa72b8cf 100644 --- a/influxdb.go +++ b/influxdb.go @@ -17,6 +17,18 @@ var ( // ErrPathRequired is returned when opening a server without a path. ErrPathRequired = errors.New("path required") + // ErrDataNodeURLRequired is returned when creating a data node without a URL. + ErrDataNodeURLRequired = errors.New("data node url required") + + // ErrDataNodeExists is returned when creating a duplicate data node. + ErrDataNodeExists = errors.New("data node exists") + + // ErrDataNodeNotFound is returned when dropping a non-existent data node. + ErrDataNodeNotFound = errors.New("data node not found") + + // ErrDataNodeRequired is returned when using a blank data node id. + ErrDataNodeRequired = errors.New("data node required") + // ErrDatabaseNameRequired is returned when creating a database without a name. ErrDatabaseNameRequired = errors.New("database name required") diff --git a/metastore.go b/metastore.go index a28422f053..394d80a3a7 100644 --- a/metastore.go +++ b/metastore.go @@ -1,6 +1,7 @@ package influxdb import ( + "encoding/binary" "sort" "strings" "time" @@ -42,6 +43,8 @@ func (m *metastore) close() error { // init initializes the metastore to ensure all top-level buckets are created. func (m *metastore) init() error { return m.db.Update(func(tx *bolt.Tx) error { + _, _ = tx.CreateBucketIfNotExists([]byte("Server")) + _, _ = tx.CreateBucketIfNotExists([]byte("DataNodes")) _, _ = tx.CreateBucketIfNotExists([]byte("Databases")) _, _ = tx.CreateBucketIfNotExists([]byte("Users")) return nil @@ -87,14 +90,46 @@ type metatx struct { *bolt.Tx } -// database returns a database from the metastore by name. -func (tx *metatx) database(name string) (db *database) { - if b := tx.Bucket([]byte("Databases")).Bucket([]byte(name)); b != nil { - mustUnmarshalJSON(b.Get([]byte("meta")), &db) +// id returns the server id. +func (tx *metatx) id() (id uint64) { + if v := tx.Bucket([]byte("Server")).Get([]byte("id")); v != nil { + id = btou64(v) } return } +// setID sets the server id. +func (tx *metatx) setID(v uint64) error { + return tx.Bucket([]byte("Server")).Put([]byte("id"), u64tob(v)) +} + +// dataNodes returns a list of all data nodes from the metastore. +func (tx *metatx) dataNodes() (a []*DataNode) { + c := tx.Bucket([]byte("DataNodes")).Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + n := newDataNode() + mustUnmarshalJSON(v, &n) + a = append(a, n) + } + return +} + +// nextDataNodeID returns a autoincrementing id. +func (tx *metatx) nextDataNodeID() uint64 { + id, _ := tx.Bucket([]byte("DataNodes")).NextSequence() + return id +} + +// saveDataNode persists a data node to the metastore. +func (tx *metatx) saveDataNode(n *DataNode) error { + return tx.Bucket([]byte("DataNodes")).Put(u64tob(n.ID), mustMarshalJSON(n)) +} + +// deleteDataNode removes data node from the metastore. +func (tx *metatx) deleteDataNode(id uint64) error { + return tx.Bucket([]byte("DataNodes")).Delete(u64tob(id)) +} + // databases returns a list of all databases from the metastore. func (tx *metatx) databases() (a []*database) { c := tx.Bucket([]byte("Databases")).Cursor() @@ -251,3 +286,13 @@ func (tx *metatx) saveUser(u *User) error { func (tx *metatx) deleteUser(name string) error { return tx.Bucket([]byte("Users")).Delete([]byte(name)) } + +// u64tob converts a uint64 into an 8-byte slice. +func u64tob(v uint64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, v) + return b +} + +// btou64 converts an 8-byte slice into an int64. +func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) } diff --git a/server.go b/server.go index cde55544bb..70c9010d9f 100644 --- a/server.go +++ b/server.go @@ -3,6 +3,7 @@ package influxdb import ( "encoding/json" "fmt" + "net/url" "os" "path/filepath" "regexp" @@ -37,30 +38,39 @@ const ( ) const ( - // broadcast messages - createDatabaseMessageType = messaging.MessageType(0x00) - deleteDatabaseMessageType = messaging.MessageType(0x01) + // Data node messages + createDataNodeMessageType = messaging.MessageType(0x00) + deleteDataNodeMessageType = messaging.MessageType(0x01) - createRetentionPolicyMessageType = messaging.MessageType(0x10) - updateRetentionPolicyMessageType = messaging.MessageType(0x11) - deleteRetentionPolicyMessageType = messaging.MessageType(0x12) - setDefaultRetentionPolicyMessageType = messaging.MessageType(0x13) + // Database messages + createDatabaseMessageType = messaging.MessageType(0x10) + deleteDatabaseMessageType = messaging.MessageType(0x11) - createUserMessageType = messaging.MessageType(0x20) - updateUserMessageType = messaging.MessageType(0x21) - deleteUserMessageType = messaging.MessageType(0x22) + // Retention policy messages + createRetentionPolicyMessageType = messaging.MessageType(0x20) + updateRetentionPolicyMessageType = messaging.MessageType(0x21) + deleteRetentionPolicyMessageType = messaging.MessageType(0x22) + setDefaultRetentionPolicyMessageType = messaging.MessageType(0x23) - createShardIfNotExistsMessageType = messaging.MessageType(0x30) + // User messages + createUserMessageType = messaging.MessageType(0x30) + updateUserMessageType = messaging.MessageType(0x31) + deleteUserMessageType = messaging.MessageType(0x32) - createSeriesIfNotExistsMessageType = messaging.MessageType(0x40) + // Shard messages + createShardIfNotExistsMessageType = messaging.MessageType(0x40) - // per-topic messages + // Series messages + createSeriesIfNotExistsMessageType = messaging.MessageType(0x50) + + // Write raw data messages (per-topic) writeSeriesMessageType = messaging.MessageType(0x80) ) // Server represents a collection of metadata and raw metric data. type Server struct { mu sync.RWMutex + id uint64 path string done chan struct{} // goroutine close notification @@ -70,6 +80,8 @@ type Server struct { meta *metastore // metadata store + dataNodes map[uint64]*DataNode // data nodes by id + databases map[string]*database // databases by name databasesByShard map[uint64]*database // databases by shard id users map[string]*User // user by name @@ -82,6 +94,7 @@ func NewServer(client MessagingClient) *Server { return &Server{ client: client, meta: &metastore{}, + dataNodes: make(map[uint64]*DataNode), databases: make(map[string]*database), databasesByShard: make(map[uint64]*database), users: make(map[string]*User), @@ -166,6 +179,9 @@ func (s *Server) Close() error { // load reads the state of the server from the metastore. func (s *Server) load() error { return s.meta.view(func(tx *metatx) error { + // Read server id. + s.id = tx.id() + // Load databases. s.databases = make(map[string]*database) for _, db := range tx.databases() { @@ -234,6 +250,114 @@ func (s *Server) sync(index uint64) error { } } +// DataNode returns a data node by id. +func (s *Server) DataNode(id uint64) *DataNode { + s.mu.RLock() + defer s.mu.RUnlock() + return s.dataNodes[id] +} + +// DataNodeByURL returns a data node by url. +func (s *Server) DataNodeByURL(u *url.URL) *DataNode { + s.mu.RLock() + defer s.mu.RUnlock() + for _, n := range s.dataNodes { + if n.URL.String() == u.String() { + return n + } + } + return nil +} + +// DataNodes returns a list of data nodes. +func (s *Server) DataNodes() (a []*DataNode) { + s.mu.RLock() + defer s.mu.RUnlock() + for _, n := range s.dataNodes { + a = append(a, n) + } + sort.Sort(dataNodes(a)) + return +} + +// CreateDataNode creates a new data node with a given URL. +func (s *Server) CreateDataNode(u *url.URL) error { + c := &createDataNodeCommand{URL: u.String()} + _, err := s.broadcast(createDataNodeMessageType, c) + return err +} + +func (s *Server) applyCreateDataNode(m *messaging.Message) (err error) { + var c createDataNodeCommand + mustUnmarshalJSON(m.Data, &c) + + s.mu.Lock() + defer s.mu.Unlock() + + // Validate parameters. + if c.URL == "" { + return ErrDataNodeURLRequired + } + + // Check that another node with the same URL doesn't already exist. + u, _ := url.Parse(c.URL) + for _, n := range s.dataNodes { + if n.URL.String() == u.String() { + return ErrDataNodeExists + } + } + + // Create data node. + n := newDataNode() + n.URL = u + + // Persist to metastore. + err = s.meta.mustUpdate(func(tx *metatx) error { + n.ID = tx.nextDataNodeID() + return tx.saveDataNode(n) + }) + + // Add to node on server. + s.dataNodes[n.ID] = n + + return +} + +type createDataNodeCommand struct { + URL string `json:"url"` +} + +// DeleteDataNode deletes an existing data node. +func (s *Server) DeleteDataNode(id uint64) error { + c := &deleteDataNodeCommand{ID: id} + _, err := s.broadcast(deleteDataNodeMessageType, c) + return err +} + +func (s *Server) applyDeleteDataNode(m *messaging.Message) (err error) { + var c deleteDataNodeCommand + mustUnmarshalJSON(m.Data, &c) + + s.mu.Lock() + defer s.mu.Unlock() + n := s.dataNodes[c.ID] + if n == nil { + return ErrDataNodeNotFound + } + + // Remove from metastore. + err = s.meta.mustUpdate(func(tx *metatx) error { return tx.deleteDataNode(c.ID) }) + + // Delete the node. + delete(s.dataNodes, n.ID) + + return +} + +type deleteDataNodeCommand struct { + ID uint64 `json:"id"` +} + // DatabaseExists returns true if a database exists. func (s *Server) DatabaseExists(name string) bool { s.mu.RLock() @@ -431,6 +555,8 @@ func (s *Server) applyCreateShardIfNotExists(m *messaging.Message) (err error) { db.shards[sh.ID] = sh rp.Shards = append(rp.Shards, sh) + // TODO: Subscribe to shard if it matches the server's index. + return } @@ -949,6 +1075,10 @@ func (s *Server) processor(done chan struct{}) { switch m.Type { case writeSeriesMessageType: err = s.applyWriteSeries(m) + case createDataNodeMessageType: + err = s.applyCreateDataNode(m) + case deleteDataNodeMessageType: + err = s.applyDeleteDataNode(m) case createDatabaseMessageType: err = s.applyCreateDatabase(m) case deleteDatabaseMessageType: @@ -992,6 +1122,21 @@ type MessagingClient interface { C() <-chan *messaging.Message } +// DataNode represents a data node in the cluster. +type DataNode struct { + ID uint64 + URL *url.URL +} + +// newDataNode returns an instance of DataNode. +func newDataNode() *DataNode { return &DataNode{} } + +type dataNodes []*DataNode + +func (p dataNodes) Len() int { return len(p) } +func (p dataNodes) Less(i, j int) bool { return p[i].ID < p[j].ID } +func (p dataNodes) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + // database represents a collection of retention policies. type database struct { name string diff --git a/server_test.go b/server_test.go index bfa973aef6..95cd463bfe 100644 --- a/server_test.go +++ b/server_test.go @@ -3,6 +3,7 @@ package influxdb_test import ( "fmt" "io/ioutil" + "net/url" "os" "reflect" "testing" @@ -32,6 +33,66 @@ func TestServer_Open_ErrServerOpen(t *testing.T) { t.Skip("pending") } // Ensure an error is returned when opening a server without a path. func TestServer_Open_ErrPathRequired(t *testing.T) { t.Skip("pending") } +// Ensure the server can create a new data node. +func TestServer_CreateDataNode(t *testing.T) { + s := OpenServer(NewMessagingClient()) + defer s.Close() + + // Create a new node. + u, _ := url.Parse("http://localhost:80000") + if err := s.CreateDataNode(u); err != nil { + t.Fatal(err) + } + s.Restart() + + // Verify that the node exists. + if n := s.DataNodeByURL(u); n == nil { + t.Fatalf("data node not found") + } else if n.URL.String() != "http://localhost:80000" { + t.Fatalf("unexpected url: %s", n.URL) + } else if n.ID == 0 { + t.Fatalf("unexpected id: %d", n.ID) + } +} + +// Ensure the server returns an error when creating a duplicate node. +func TestServer_CreateDatabase_ErrDataNodeExists(t *testing.T) { + s := OpenServer(NewMessagingClient()) + defer s.Close() + + // Create a node with the same URL twice. + u, _ := url.Parse("http://localhost:80000") + if err := s.CreateDataNode(u); err != nil { + t.Fatal(err) + } + if err := s.CreateDataNode(u); err != influxdb.ErrDataNodeExists { + t.Fatal(err) + } +} + +// Ensure the server can delete a node. +func TestServer_DeleteDataNode(t *testing.T) { + s := OpenServer(NewMessagingClient()) + defer s.Close() + + // Create a data node and verify it exists. + u, _ := url.Parse("http://localhost:80000") + if err := s.CreateDataNode(u); err != nil { + t.Fatal(err) + } else if s.DataNodeByURL(u) == nil { + t.Fatalf("data node not actually created") + } + s.Restart() + + // Drop the node and verify that it's gone. + n := s.DataNodeByURL(u) + if err := s.DeleteDataNode(n.ID); err != nil { + t.Fatal(err) + } else if s.DataNode(n.ID) != nil { + t.Fatalf("data node not actually dropped") + } +} + // Ensure the server can create a database. func TestServer_CreateDatabase(t *testing.T) { s := OpenServer(NewMessagingClient()) diff --git a/shard.go b/shard.go index 57a83d2951..7c98a4b064 100644 --- a/shard.go +++ b/shard.go @@ -16,6 +16,9 @@ type Shard struct { StartTime time.Time `json:"startTime,omitempty"` EndTime time.Time `json:"endTime,omitempty"` + replicaN []uint64 // replication factor + dataNodeIDs []uint64 // owner nodes + store *bolt.DB }