From a04eb398cf393ee8a6a2911571e7be71c40ed14c Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 29 Dec 2014 16:12:51 -0700 Subject: [PATCH 1/5] Add node management. --- handler.go | 86 ++++++++++++++++++++++--- handler_test.go | 92 +++++++++++++++++++++++++++ influxdb.go | 12 ++++ metastore.go | 48 ++++++++++++-- server.go | 166 ++++++++++++++++++++++++++++++++++++++++++++---- server_test.go | 61 ++++++++++++++++++ shard.go | 3 + 7 files changed, 443 insertions(+), 25 deletions(-) diff --git a/handler.go b/handler.go index c655321a3e..b326043ce6 100644 --- a/handler.go +++ b/handler.go @@ -3,6 +3,8 @@ package influxdb import ( "encoding/json" "net/http" + "net/url" + "strconv" "strings" "github.com/bmizerany/pat" @@ -58,13 +60,14 @@ func NewHandler(s *Server) *Handler { h.mux.Put("/db/:db/retention_policies/:name", http.HandlerFunc(h.serveUpdateRetentionPolicy)) h.mux.Del("/db/:db/retention_policies/:name", http.HandlerFunc(h.serveDeleteRetentionPolicy)) + // Node routes. + h.mux.Get("/nodes", http.HandlerFunc(h.serveNodes)) + h.mux.Post("/nodes", http.HandlerFunc(h.serveCreateNode)) + h.mux.Del("/nodes/:id", http.HandlerFunc(h.serveDeleteNode)) + // Utilities h.mux.Get("/ping", http.HandlerFunc(h.servePing)) - // Cluster config endpoints - h.mux.Get("/cluster/servers", http.HandlerFunc(h.serveServers)) - h.mux.Del("/cluster/servers/:id", http.HandlerFunc(h.serveDeleteServer)) - return h } @@ -413,11 +416,78 @@ func (h *Handler) serveDeleteRetentionPolicy(w http.ResponseWriter, r *http.Requ w.WriteHeader(http.StatusNoContent) } -// serveServers returns a list of servers in the cluster. -func (h *Handler) serveServers(w http.ResponseWriter, r *http.Request) {} +// serveNodes returns a list of all data nodes in the cluster. +func (h *Handler) serveNodes(w http.ResponseWriter, r *http.Request) { + // Generate a list of objects for encoding to the API. + a := make([]*nodeJSON, 0) + for _, n := range h.server.Nodes() { + a = append(a, &nodeJSON{ + ID: n.ID, + URL: n.URL.String(), + }) + } -// serveDeleteServer removes a server from the cluster. -func (h *Handler) serveDeleteServer(w http.ResponseWriter, r *http.Request) {} + w.Header().Add("content-type", "application/json") + _ = json.NewEncoder(w).Encode(a) +} + +// serveCreateNode creates a new node in the cluster. +func (h *Handler) serveCreateNode(w http.ResponseWriter, r *http.Request) { + // Read in node from request body. + var n nodeJSON + if err := json.NewDecoder(r.Body).Decode(&n); err != nil { + h.error(w, err.Error(), http.StatusBadRequest) + return + } + + // Parse the URL. + u, err := url.Parse(n.URL) + if err != nil { + h.error(w, "invalid node url", http.StatusBadRequest) + return + } + + // Create the node. + if err := h.server.CreateNode(u); err == ErrNodeExists { + h.error(w, err.Error(), http.StatusConflict) + return + } else if err != nil { + h.error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Write new node back to client. + node := h.server.NodeByURL(u) + w.WriteHeader(http.StatusCreated) + w.Header().Add("content-type", "application/json") + _ = json.NewEncoder(w).Encode(&nodeJSON{ID: node.ID, URL: node.URL.String()}) +} + +// serveDeleteNode removes an existing node. +func (h *Handler) serveDeleteNode(w http.ResponseWriter, r *http.Request) { + // Parse node id. + nodeID, err := strconv.ParseUint(r.URL.Query().Get(":id"), 10, 64) + if err != nil { + h.error(w, "invalid node id", http.StatusBadRequest) + return + } + + // Delete the node. + if err := h.server.DeleteNode(nodeID); err == ErrNodeNotFound { + h.error(w, err.Error(), http.StatusNotFound) + return + } else if err != nil { + h.error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +type nodeJSON struct { + ID uint64 `json:"id"` + URL string `json:"url"` +} // error returns an error to the client in a standard format. func (h *Handler) error(w http.ResponseWriter, error string, code int) { diff --git a/handler_test.go b/handler_test.go index 92fbe535c6..2fe248703c 100644 --- a/handler_test.go +++ b/handler_test.go @@ -5,6 +5,7 @@ import ( "io/ioutil" "net/http" "net/http/httptest" + "net/url" "strings" "testing" "time" @@ -516,6 +517,88 @@ func TestHandler_DeleteUser_UserNotFound(t *testing.T) { } } +func TestHandler_Nodes(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + srvr.CreateNode(MustParseURL("http://localhost:1000")) + srvr.CreateNode(MustParseURL("http://localhost:2000")) + srvr.CreateNode(MustParseURL("http://localhost:3000")) + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("GET", s.URL+`/nodes`, "") + if status != http.StatusOK { + t.Fatalf("unexpected status: %d", status) + } else if body != `[{"id":1,"url":"http://localhost:1000"},{"id":2,"url":"http://localhost:2000"},{"id":3,"url":"http://localhost:3000"}]` { + t.Fatalf("unexpected body: %s", body) + } +} + +func TestHandler_CreateNode(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("POST", s.URL+`/nodes`, `{"url":"http://localhost:1000"}`) + if status != http.StatusCreated { + t.Fatalf("unexpected status: %d", status) + } else if body != `{"id":1,"url":"http://localhost:1000"}` { + t.Fatalf("unexpected body: %s", body) + } +} + +func TestHandler_CreateNode_BadRequest(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("POST", s.URL+`/nodes`, `{"name":`) + if status != http.StatusBadRequest { + t.Fatalf("unexpected status: %d", status) + } else if body != `unexpected EOF` { + t.Fatalf("unexpected body: %s", body) + } +} + +func TestHandler_CreateNode_InternalServerError(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("POST", s.URL+`/nodes`, `{"url":""}`) + if status != http.StatusInternalServerError { + t.Fatalf("unexpected status: %d", status, body) + } else if body != `node url required` { + t.Fatalf("unexpected body: %s", body) + } +} + +func TestHandler_DeleteNode(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + srvr.CreateNode(MustParseURL("http://localhost:1000")) + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("DELETE", s.URL+`/nodes/1`, "") + if status != http.StatusNoContent { + t.Fatalf("unexpected status: %d", status) + } else if body != `` { + t.Fatalf("unexpected body: %s", body) + } +} + +func TestHandler_DeleteUser_NodeNotFound(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("DELETE", s.URL+`/nodes/10000`, "") + if status != http.StatusNotFound { + t.Fatalf("unexpected status: %d", status) + } else if body != `node not found` { + t.Fatalf("unexpected body: %s", body) + } +} + func MustHTTP(verb, url, body string) (int, string) { req, err := http.NewRequest(verb, url, bytes.NewBuffer([]byte(body))) if err != nil { @@ -534,6 +617,15 @@ func MustHTTP(verb, url, body string) (int, string) { return resp.StatusCode, strings.TrimRight(string(b), "\n") } +// MustParseURL parses a string into a URL. Panic on error. +func MustParseURL(s string) *url.URL { + u, err := url.Parse(s) + if err != nil { + panic(err.Error()) + } + return u +} + // Server is a test HTTP server that wraps a handler type HTTPServer struct { *httptest.Server diff --git a/influxdb.go b/influxdb.go index ec8dca2e0e..7b1c22f505 100644 --- a/influxdb.go +++ b/influxdb.go @@ -17,6 +17,18 @@ var ( // ErrPathRequired is returned when opening a server without a path. ErrPathRequired = errors.New("path required") + // ErrNodeURLRequired is returned when creating a node without a URL. + ErrNodeURLRequired = errors.New("node url required") + + // ErrNodeExists is returned when creating a duplicate node. + ErrNodeExists = errors.New("node exists") + + // ErrNodeNotFound is returned when dropping a non-existent node. + ErrNodeNotFound = errors.New("node not found") + + // ErrNodeRequired is returned when using a blank node id. + ErrNodeRequired = errors.New("node required") + // ErrDatabaseNameRequired is returned when creating a database without a name. ErrDatabaseNameRequired = errors.New("database name required") diff --git a/metastore.go b/metastore.go index a28422f053..8dd9721bbf 100644 --- a/metastore.go +++ b/metastore.go @@ -1,6 +1,7 @@ package influxdb import ( + "encoding/binary" "sort" "strings" "time" @@ -42,6 +43,8 @@ func (m *metastore) close() error { // init initializes the metastore to ensure all top-level buckets are created. func (m *metastore) init() error { return m.db.Update(func(tx *bolt.Tx) error { + _, _ = tx.CreateBucketIfNotExists([]byte("Server")) + _, _ = tx.CreateBucketIfNotExists([]byte("Nodes")) _, _ = tx.CreateBucketIfNotExists([]byte("Databases")) _, _ = tx.CreateBucketIfNotExists([]byte("Users")) return nil @@ -87,14 +90,41 @@ type metatx struct { *bolt.Tx } -// database returns a database from the metastore by name. -func (tx *metatx) database(name string) (db *database) { - if b := tx.Bucket([]byte("Databases")).Bucket([]byte(name)); b != nil { - mustUnmarshalJSON(b.Get([]byte("meta")), &db) +// id returns the server id. +func (tx *metatx) id() (id uint64) { + if v := tx.Bucket([]byte("Server")).Get([]byte("id")); v != nil { + id = btou64(v) } return } +// setID sets the server id. +func (tx *metatx) setID(v uint64) error { + return tx.Bucket([]byte("Server")).Put([]byte("id"), u64tob(v)) +} + +// databases returns a list of all nodes from the metastore. +func (tx *metatx) nodes() (a []*Node) { + c := tx.Bucket([]byte("Nodes")).Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + n := newNode() + mustUnmarshalJSON(v, &n) + a = append(a, n) + } + sort.Sort(nodes(a)) + return +} + +// saveNode persists a node to the metastore. +func (tx *metatx) saveNode(n *Node) error { + return tx.Bucket([]byte("Nodes")).Put(u64tob(n.ID), mustMarshalJSON(n)) +} + +// deleteNode removes node from the metastore. +func (tx *metatx) deleteNode(id uint64) error { + return tx.Bucket([]byte("Nodes")).Delete(u64tob(id)) +} + // databases returns a list of all databases from the metastore. func (tx *metatx) databases() (a []*database) { c := tx.Bucket([]byte("Databases")).Cursor() @@ -251,3 +281,13 @@ func (tx *metatx) saveUser(u *User) error { func (tx *metatx) deleteUser(name string) error { return tx.Bucket([]byte("Users")).Delete([]byte(name)) } + +// u64tob converts a uint64 into an 8-byte slice. +func u64tob(v uint64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, v) + return b +} + +// btou64 converts an 8-byte slice into an int64. +func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) } diff --git a/server.go b/server.go index cde55544bb..7e69850ce2 100644 --- a/server.go +++ b/server.go @@ -3,6 +3,7 @@ package influxdb import ( "encoding/json" "fmt" + "net/url" "os" "path/filepath" "regexp" @@ -37,30 +38,39 @@ const ( ) const ( - // broadcast messages - createDatabaseMessageType = messaging.MessageType(0x00) - deleteDatabaseMessageType = messaging.MessageType(0x01) + // Node messages + createNodeMessageType = messaging.MessageType(0x00) + deleteNodeMessageType = messaging.MessageType(0x01) - createRetentionPolicyMessageType = messaging.MessageType(0x10) - updateRetentionPolicyMessageType = messaging.MessageType(0x11) - deleteRetentionPolicyMessageType = messaging.MessageType(0x12) - setDefaultRetentionPolicyMessageType = messaging.MessageType(0x13) + // Database messages + createDatabaseMessageType = messaging.MessageType(0x10) + deleteDatabaseMessageType = messaging.MessageType(0x11) - createUserMessageType = messaging.MessageType(0x20) - updateUserMessageType = messaging.MessageType(0x21) - deleteUserMessageType = messaging.MessageType(0x22) + // Retention policy messages + createRetentionPolicyMessageType = messaging.MessageType(0x20) + updateRetentionPolicyMessageType = messaging.MessageType(0x21) + deleteRetentionPolicyMessageType = messaging.MessageType(0x22) + setDefaultRetentionPolicyMessageType = messaging.MessageType(0x23) - createShardIfNotExistsMessageType = messaging.MessageType(0x30) + // User messages + createUserMessageType = messaging.MessageType(0x30) + updateUserMessageType = messaging.MessageType(0x31) + deleteUserMessageType = messaging.MessageType(0x32) - createSeriesIfNotExistsMessageType = messaging.MessageType(0x40) + // Shard messages + createShardIfNotExistsMessageType = messaging.MessageType(0x40) - // per-topic messages + // Series messages + createSeriesIfNotExistsMessageType = messaging.MessageType(0x50) + + // Write raw data messages (per-topic) writeSeriesMessageType = messaging.MessageType(0x80) ) // Server represents a collection of metadata and raw metric data. type Server struct { mu sync.RWMutex + id uint64 path string done chan struct{} // goroutine close notification @@ -70,6 +80,9 @@ type Server struct { meta *metastore // metadata store + nodes map[uint64]*Node // server nodes by id + nodesByURL map[string]*Node // server nodes by url + databases map[string]*database // databases by name databasesByShard map[uint64]*database // databases by shard id users map[string]*User // user by name @@ -82,6 +95,8 @@ func NewServer(client MessagingClient) *Server { return &Server{ client: client, meta: &metastore{}, + nodes: make(map[uint64]*Node), + nodesByURL: make(map[string]*Node), databases: make(map[string]*database), databasesByShard: make(map[uint64]*database), users: make(map[string]*User), @@ -166,6 +181,9 @@ func (s *Server) Close() error { // load reads the state of the server from the metastore. func (s *Server) load() error { return s.meta.view(func(tx *metatx) error { + // Read server id. + s.id = tx.id() + // Load databases. s.databases = make(map[string]*database) for _, db := range tx.databases() { @@ -234,6 +252,107 @@ func (s *Server) sync(index uint64) error { } } +// Node returns a node by id. +func (s *Server) Node(id uint64) *Node { + s.mu.RLock() + defer s.mu.RUnlock() + return s.nodes[id] +} + +// NodeByURL returns a node by url. +func (s *Server) NodeByURL(u *url.URL) *Node { + s.mu.RLock() + defer s.mu.RUnlock() + return s.nodesByURL[u.String()] +} + +// Nodes returns a list of nodes. +func (s *Server) Nodes() (a []*Node) { + s.mu.RLock() + defer s.mu.RUnlock() + for _, n := range s.nodes { + a = append(a, n) + } + sort.Sort(nodes(a)) + return +} + +// CreateNode creates a new node with a given URL. +func (s *Server) CreateNode(u *url.URL) error { + c := &createNodeCommand{URL: u.String()} + _, err := s.broadcast(createNodeMessageType, c) + return err +} + +func (s *Server) applyCreateNode(m *messaging.Message) (err error) { + var c createNodeCommand + mustUnmarshalJSON(m.Data, &c) + + s.mu.Lock() + defer s.mu.Unlock() + + // Validate parameters. + if c.URL == "" { + return ErrNodeURLRequired + } + + // Check that another node doesn't already exist. + u, _ := url.Parse(c.URL) + if s.nodesByURL[u.String()] != nil { + return ErrNodeExists + } + + // Create node. + n := newNode() + n.ID = m.Index + n.URL = u + + // Persist to metastore. + err = s.meta.mustUpdate(func(tx *metatx) error { return tx.saveNode(n) }) + + // Add to node on server. + s.nodes[n.ID] = n + s.nodesByURL[n.URL.String()] = n + + return +} + +type createNodeCommand struct { + URL string `json:"url"` +} + +// DeleteNode deletes an existing node. +func (s *Server) DeleteNode(id uint64) error { + c := &deleteNodeCommand{ID: id} + _, err := s.broadcast(deleteNodeMessageType, c) + return err +} + +func (s *Server) applyDeleteNode(m *messaging.Message) (err error) { + var c deleteNodeCommand + mustUnmarshalJSON(m.Data, &c) + + s.mu.Lock() + defer s.mu.Unlock() + n := s.nodes[c.ID] + if n == nil { + return ErrNodeNotFound + } + + // Remove from metastore. + err = s.meta.mustUpdate(func(tx *metatx) error { return tx.deleteNode(c.ID) }) + + // Delete the node. + delete(s.nodes, n.ID) + delete(s.nodesByURL, n.URL.String()) + + return +} + +type deleteNodeCommand struct { + ID uint64 `json:"id"` +} + // DatabaseExists returns true if a database exists. func (s *Server) DatabaseExists(name string) bool { s.mu.RLock() @@ -431,6 +550,8 @@ func (s *Server) applyCreateShardIfNotExists(m *messaging.Message) (err error) { db.shards[sh.ID] = sh rp.Shards = append(rp.Shards, sh) + // TODO: Subscribe to shard if it matches the server's index. + return } @@ -949,6 +1070,10 @@ func (s *Server) processor(done chan struct{}) { switch m.Type { case writeSeriesMessageType: err = s.applyWriteSeries(m) + case createNodeMessageType: + err = s.applyCreateNode(m) + case deleteNodeMessageType: + err = s.applyDeleteNode(m) case createDatabaseMessageType: err = s.applyCreateDatabase(m) case deleteDatabaseMessageType: @@ -992,6 +1117,21 @@ type MessagingClient interface { C() <-chan *messaging.Message } +// Node represents a data node in the cluster. +type Node struct { + ID uint64 + URL *url.URL +} + +// newNode returns an instance of Node. +func newNode() *Node { return &Node{} } + +type nodes []*Node + +func (p nodes) Len() int { return len(p) } +func (p nodes) Less(i, j int) bool { return p[i].ID < p[j].ID } +func (p nodes) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + // database represents a collection of retention policies. type database struct { name string diff --git a/server_test.go b/server_test.go index bfa973aef6..4e6a8c2276 100644 --- a/server_test.go +++ b/server_test.go @@ -3,6 +3,7 @@ package influxdb_test import ( "fmt" "io/ioutil" + "net/url" "os" "reflect" "testing" @@ -32,6 +33,66 @@ func TestServer_Open_ErrServerOpen(t *testing.T) { t.Skip("pending") } // Ensure an error is returned when opening a server without a path. func TestServer_Open_ErrPathRequired(t *testing.T) { t.Skip("pending") } +// Ensure the server can create a new data node. +func TestServer_CreateNode(t *testing.T) { + s := OpenServer(NewMessagingClient()) + defer s.Close() + + // Create a new node. + u, _ := url.Parse("http://localhost:80000") + if err := s.CreateNode(u); err != nil { + t.Fatal(err) + } + s.Restart() + + // Verify that the node exists. + if n := s.NodeByURL(u); n == nil { + t.Fatalf("node not found") + } else if n.URL.String() != "http://localhost:80000" { + t.Fatalf("unexpected url: %s", n.URL) + } else if n.ID == 0 { + t.Fatalf("unexpected id: %d", n.ID) + } +} + +// Ensure the server returns an error when creating a duplicate node. +func TestServer_CreateDatabase_ErrNodeExists(t *testing.T) { + s := OpenServer(NewMessagingClient()) + defer s.Close() + + // Create a node with the same URL twice. + u, _ := url.Parse("http://localhost:80000") + if err := s.CreateNode(u); err != nil { + t.Fatal(err) + } + if err := s.CreateNode(u); err != influxdb.ErrNodeExists { + t.Fatal(err) + } +} + +// Ensure the server can delete a node. +func TestServer_DeleteNode(t *testing.T) { + s := OpenServer(NewMessagingClient()) + defer s.Close() + + // Create a node and verify it exists. + u, _ := url.Parse("http://localhost:80000") + if err := s.CreateNode(u); err != nil { + t.Fatal(err) + } else if s.NodeByURL(u) == nil { + t.Fatalf("node not actually created") + } + s.Restart() + + // Drop the node and verify that it's gone. + n := s.NodeByURL(u) + if err := s.DeleteNode(n.ID); err != nil { + t.Fatal(err) + } else if s.Node(n.ID) != nil { + t.Fatalf("node not actually dropped") + } +} + // Ensure the server can create a database. func TestServer_CreateDatabase(t *testing.T) { s := OpenServer(NewMessagingClient()) diff --git a/shard.go b/shard.go index 57a83d2951..a292cb4727 100644 --- a/shard.go +++ b/shard.go @@ -16,6 +16,9 @@ type Shard struct { StartTime time.Time `json:"startTime,omitempty"` EndTime time.Time `json:"endTime,omitempty"` + replicaN []uint64 // replication factor + nodeIDs []uint64 // owner nodes + store *bolt.DB } From e88bd55762a034533686800e6e18f8bdb9c73a14 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 30 Dec 2014 08:50:15 -0700 Subject: [PATCH 2/5] Rename Node to DataNode. --- handler.go | 44 +++++++++--------- handler_test.go | 38 ++++++++-------- influxdb.go | 16 +++---- metastore.go | 24 +++++----- server.go | 117 +++++++++++++++++++++++++----------------------- server_test.go | 32 ++++++------- 6 files changed, 137 insertions(+), 134 deletions(-) diff --git a/handler.go b/handler.go index b326043ce6..e18f454cb8 100644 --- a/handler.go +++ b/handler.go @@ -60,10 +60,10 @@ func NewHandler(s *Server) *Handler { h.mux.Put("/db/:db/retention_policies/:name", http.HandlerFunc(h.serveUpdateRetentionPolicy)) h.mux.Del("/db/:db/retention_policies/:name", http.HandlerFunc(h.serveDeleteRetentionPolicy)) - // Node routes. - h.mux.Get("/nodes", http.HandlerFunc(h.serveNodes)) - h.mux.Post("/nodes", http.HandlerFunc(h.serveCreateNode)) - h.mux.Del("/nodes/:id", http.HandlerFunc(h.serveDeleteNode)) + // Data node routes. + h.mux.Get("/data_nodes", http.HandlerFunc(h.serveDataNodes)) + h.mux.Post("/data_nodes", http.HandlerFunc(h.serveCreateDataNode)) + h.mux.Del("/data_nodes/:id", http.HandlerFunc(h.serveDeleteDataNode)) // Utilities h.mux.Get("/ping", http.HandlerFunc(h.servePing)) @@ -416,12 +416,12 @@ func (h *Handler) serveDeleteRetentionPolicy(w http.ResponseWriter, r *http.Requ w.WriteHeader(http.StatusNoContent) } -// serveNodes returns a list of all data nodes in the cluster. -func (h *Handler) serveNodes(w http.ResponseWriter, r *http.Request) { +// serveDataNodes returns a list of all data nodes in the cluster. +func (h *Handler) serveDataNodes(w http.ResponseWriter, r *http.Request) { // Generate a list of objects for encoding to the API. - a := make([]*nodeJSON, 0) - for _, n := range h.server.Nodes() { - a = append(a, &nodeJSON{ + a := make([]*dataNodeJSON, 0) + for _, n := range h.server.DataNodes() { + a = append(a, &dataNodeJSON{ ID: n.ID, URL: n.URL.String(), }) @@ -431,10 +431,10 @@ func (h *Handler) serveNodes(w http.ResponseWriter, r *http.Request) { _ = json.NewEncoder(w).Encode(a) } -// serveCreateNode creates a new node in the cluster. -func (h *Handler) serveCreateNode(w http.ResponseWriter, r *http.Request) { - // Read in node from request body. - var n nodeJSON +// serveCreateDataNode creates a new data node in the cluster. +func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request) { + // Read in data node from request body. + var n dataNodeJSON if err := json.NewDecoder(r.Body).Decode(&n); err != nil { h.error(w, err.Error(), http.StatusBadRequest) return @@ -443,12 +443,12 @@ func (h *Handler) serveCreateNode(w http.ResponseWriter, r *http.Request) { // Parse the URL. u, err := url.Parse(n.URL) if err != nil { - h.error(w, "invalid node url", http.StatusBadRequest) + h.error(w, "invalid data node url", http.StatusBadRequest) return } - // Create the node. - if err := h.server.CreateNode(u); err == ErrNodeExists { + // Create the data node. + if err := h.server.CreateDataNode(u); err == ErrDataNodeExists { h.error(w, err.Error(), http.StatusConflict) return } else if err != nil { @@ -457,14 +457,14 @@ func (h *Handler) serveCreateNode(w http.ResponseWriter, r *http.Request) { } // Write new node back to client. - node := h.server.NodeByURL(u) + node := h.server.DataNodeByURL(u) w.WriteHeader(http.StatusCreated) w.Header().Add("content-type", "application/json") - _ = json.NewEncoder(w).Encode(&nodeJSON{ID: node.ID, URL: node.URL.String()}) + _ = json.NewEncoder(w).Encode(&dataNodeJSON{ID: node.ID, URL: node.URL.String()}) } -// serveDeleteNode removes an existing node. -func (h *Handler) serveDeleteNode(w http.ResponseWriter, r *http.Request) { +// serveDeleteDataNode removes an existing node. +func (h *Handler) serveDeleteDataNode(w http.ResponseWriter, r *http.Request) { // Parse node id. nodeID, err := strconv.ParseUint(r.URL.Query().Get(":id"), 10, 64) if err != nil { @@ -473,7 +473,7 @@ func (h *Handler) serveDeleteNode(w http.ResponseWriter, r *http.Request) { } // Delete the node. - if err := h.server.DeleteNode(nodeID); err == ErrNodeNotFound { + if err := h.server.DeleteDataNode(nodeID); err == ErrDataNodeNotFound { h.error(w, err.Error(), http.StatusNotFound) return } else if err != nil { @@ -484,7 +484,7 @@ func (h *Handler) serveDeleteNode(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -type nodeJSON struct { +type dataNodeJSON struct { ID uint64 `json:"id"` URL string `json:"url"` } diff --git a/handler_test.go b/handler_test.go index 2fe248703c..3b9e00885d 100644 --- a/handler_test.go +++ b/handler_test.go @@ -9,7 +9,7 @@ import ( "strings" "testing" "time" - //"fmt" + "github.com/influxdb/influxdb" ) @@ -517,15 +517,15 @@ func TestHandler_DeleteUser_UserNotFound(t *testing.T) { } } -func TestHandler_Nodes(t *testing.T) { +func TestHandler_DataNodes(t *testing.T) { srvr := OpenServer(NewMessagingClient()) - srvr.CreateNode(MustParseURL("http://localhost:1000")) - srvr.CreateNode(MustParseURL("http://localhost:2000")) - srvr.CreateNode(MustParseURL("http://localhost:3000")) + srvr.CreateDataNode(MustParseURL("http://localhost:1000")) + srvr.CreateDataNode(MustParseURL("http://localhost:2000")) + srvr.CreateDataNode(MustParseURL("http://localhost:3000")) s := NewHTTPServer(srvr) defer s.Close() - status, body := MustHTTP("GET", s.URL+`/nodes`, "") + status, body := MustHTTP("GET", s.URL+`/data_nodes`, "") if status != http.StatusOK { t.Fatalf("unexpected status: %d", status) } else if body != `[{"id":1,"url":"http://localhost:1000"},{"id":2,"url":"http://localhost:2000"},{"id":3,"url":"http://localhost:3000"}]` { @@ -533,12 +533,12 @@ func TestHandler_Nodes(t *testing.T) { } } -func TestHandler_CreateNode(t *testing.T) { +func TestHandler_CreateDataNode(t *testing.T) { srvr := OpenServer(NewMessagingClient()) s := NewHTTPServer(srvr) defer s.Close() - status, body := MustHTTP("POST", s.URL+`/nodes`, `{"url":"http://localhost:1000"}`) + status, body := MustHTTP("POST", s.URL+`/data_nodes`, `{"url":"http://localhost:1000"}`) if status != http.StatusCreated { t.Fatalf("unexpected status: %d", status) } else if body != `{"id":1,"url":"http://localhost:1000"}` { @@ -546,12 +546,12 @@ func TestHandler_CreateNode(t *testing.T) { } } -func TestHandler_CreateNode_BadRequest(t *testing.T) { +func TestHandler_CreateDataNode_BadRequest(t *testing.T) { srvr := OpenServer(NewMessagingClient()) s := NewHTTPServer(srvr) defer s.Close() - status, body := MustHTTP("POST", s.URL+`/nodes`, `{"name":`) + status, body := MustHTTP("POST", s.URL+`/data_nodes`, `{"name":`) if status != http.StatusBadRequest { t.Fatalf("unexpected status: %d", status) } else if body != `unexpected EOF` { @@ -559,26 +559,26 @@ func TestHandler_CreateNode_BadRequest(t *testing.T) { } } -func TestHandler_CreateNode_InternalServerError(t *testing.T) { +func TestHandler_CreateDataNode_InternalServerError(t *testing.T) { srvr := OpenServer(NewMessagingClient()) s := NewHTTPServer(srvr) defer s.Close() - status, body := MustHTTP("POST", s.URL+`/nodes`, `{"url":""}`) + status, body := MustHTTP("POST", s.URL+`/data_nodes`, `{"url":""}`) if status != http.StatusInternalServerError { t.Fatalf("unexpected status: %d", status, body) - } else if body != `node url required` { + } else if body != `data node url required` { t.Fatalf("unexpected body: %s", body) } } -func TestHandler_DeleteNode(t *testing.T) { +func TestHandler_DeleteDataNode(t *testing.T) { srvr := OpenServer(NewMessagingClient()) - srvr.CreateNode(MustParseURL("http://localhost:1000")) + srvr.CreateDataNode(MustParseURL("http://localhost:1000")) s := NewHTTPServer(srvr) defer s.Close() - status, body := MustHTTP("DELETE", s.URL+`/nodes/1`, "") + status, body := MustHTTP("DELETE", s.URL+`/data_nodes/1`, "") if status != http.StatusNoContent { t.Fatalf("unexpected status: %d", status) } else if body != `` { @@ -586,15 +586,15 @@ func TestHandler_DeleteNode(t *testing.T) { } } -func TestHandler_DeleteUser_NodeNotFound(t *testing.T) { +func TestHandler_DeleteUser_DataNodeNotFound(t *testing.T) { srvr := OpenServer(NewMessagingClient()) s := NewHTTPServer(srvr) defer s.Close() - status, body := MustHTTP("DELETE", s.URL+`/nodes/10000`, "") + status, body := MustHTTP("DELETE", s.URL+`/data_nodes/10000`, "") if status != http.StatusNotFound { t.Fatalf("unexpected status: %d", status) - } else if body != `node not found` { + } else if body != `data node not found` { t.Fatalf("unexpected body: %s", body) } } diff --git a/influxdb.go b/influxdb.go index 7b1c22f505..6aaa72b8cf 100644 --- a/influxdb.go +++ b/influxdb.go @@ -17,17 +17,17 @@ var ( // ErrPathRequired is returned when opening a server without a path. ErrPathRequired = errors.New("path required") - // ErrNodeURLRequired is returned when creating a node without a URL. - ErrNodeURLRequired = errors.New("node url required") + // ErrDataNodeURLRequired is returned when creating a data node without a URL. + ErrDataNodeURLRequired = errors.New("data node url required") - // ErrNodeExists is returned when creating a duplicate node. - ErrNodeExists = errors.New("node exists") + // ErrDataNodeExists is returned when creating a duplicate data node. + ErrDataNodeExists = errors.New("data node exists") - // ErrNodeNotFound is returned when dropping a non-existent node. - ErrNodeNotFound = errors.New("node not found") + // ErrDataNodeNotFound is returned when dropping a non-existent data node. + ErrDataNodeNotFound = errors.New("data node not found") - // ErrNodeRequired is returned when using a blank node id. - ErrNodeRequired = errors.New("node required") + // ErrDataNodeRequired is returned when using a blank data node id. + ErrDataNodeRequired = errors.New("data node required") // ErrDatabaseNameRequired is returned when creating a database without a name. ErrDatabaseNameRequired = errors.New("database name required") diff --git a/metastore.go b/metastore.go index 8dd9721bbf..b26325c80d 100644 --- a/metastore.go +++ b/metastore.go @@ -44,7 +44,7 @@ func (m *metastore) close() error { func (m *metastore) init() error { return m.db.Update(func(tx *bolt.Tx) error { _, _ = tx.CreateBucketIfNotExists([]byte("Server")) - _, _ = tx.CreateBucketIfNotExists([]byte("Nodes")) + _, _ = tx.CreateBucketIfNotExists([]byte("DataNodes")) _, _ = tx.CreateBucketIfNotExists([]byte("Databases")) _, _ = tx.CreateBucketIfNotExists([]byte("Users")) return nil @@ -103,26 +103,26 @@ func (tx *metatx) setID(v uint64) error { return tx.Bucket([]byte("Server")).Put([]byte("id"), u64tob(v)) } -// databases returns a list of all nodes from the metastore. -func (tx *metatx) nodes() (a []*Node) { - c := tx.Bucket([]byte("Nodes")).Cursor() +// databases returns a list of all data nodes from the metastore. +func (tx *metatx) dataNodes() (a []*DataNode) { + c := tx.Bucket([]byte("DataNodes")).Cursor() for k, v := c.First(); k != nil; k, v = c.Next() { - n := newNode() + n := newDataNode() mustUnmarshalJSON(v, &n) a = append(a, n) } - sort.Sort(nodes(a)) + sort.Sort(dataNodes(a)) return } -// saveNode persists a node to the metastore. -func (tx *metatx) saveNode(n *Node) error { - return tx.Bucket([]byte("Nodes")).Put(u64tob(n.ID), mustMarshalJSON(n)) +// saveDataNode persists a data node to the metastore. +func (tx *metatx) saveDataNode(n *DataNode) error { + return tx.Bucket([]byte("DataNodes")).Put(u64tob(n.ID), mustMarshalJSON(n)) } -// deleteNode removes node from the metastore. -func (tx *metatx) deleteNode(id uint64) error { - return tx.Bucket([]byte("Nodes")).Delete(u64tob(id)) +// deleteDataNode removes data node from the metastore. +func (tx *metatx) deleteDataNode(id uint64) error { + return tx.Bucket([]byte("DataNodes")).Delete(u64tob(id)) } // databases returns a list of all databases from the metastore. diff --git a/server.go b/server.go index 7e69850ce2..3dfe4b16bb 100644 --- a/server.go +++ b/server.go @@ -38,9 +38,9 @@ const ( ) const ( - // Node messages - createNodeMessageType = messaging.MessageType(0x00) - deleteNodeMessageType = messaging.MessageType(0x01) + // Data node messages + createDataNodeMessageType = messaging.MessageType(0x00) + deleteDataNodeMessageType = messaging.MessageType(0x01) // Database messages createDatabaseMessageType = messaging.MessageType(0x10) @@ -80,8 +80,7 @@ type Server struct { meta *metastore // metadata store - nodes map[uint64]*Node // server nodes by id - nodesByURL map[string]*Node // server nodes by url + dataNodes map[uint64]*DataNode // data nodes by id databases map[string]*database // databases by name databasesByShard map[uint64]*database // databases by shard id @@ -95,8 +94,7 @@ func NewServer(client MessagingClient) *Server { return &Server{ client: client, meta: &metastore{}, - nodes: make(map[uint64]*Node), - nodesByURL: make(map[string]*Node), + dataNodes: make(map[uint64]*DataNode), databases: make(map[string]*database), databasesByShard: make(map[uint64]*database), users: make(map[string]*User), @@ -252,40 +250,45 @@ func (s *Server) sync(index uint64) error { } } -// Node returns a node by id. -func (s *Server) Node(id uint64) *Node { +// DataNode returns a data node by id. +func (s *Server) DataNode(id uint64) *DataNode { s.mu.RLock() defer s.mu.RUnlock() - return s.nodes[id] + return s.dataNodes[id] } -// NodeByURL returns a node by url. -func (s *Server) NodeByURL(u *url.URL) *Node { +// DataNodeByURL returns a data node by url. +func (s *Server) DataNodeByURL(u *url.URL) *DataNode { s.mu.RLock() defer s.mu.RUnlock() - return s.nodesByURL[u.String()] + for _, n := range s.dataNodes { + if n.URL.String() == u.String() { + return n + } + } + return nil } -// Nodes returns a list of nodes. -func (s *Server) Nodes() (a []*Node) { +// DataNodes returns a list of data nodes. +func (s *Server) DataNodes() (a []*DataNode) { s.mu.RLock() defer s.mu.RUnlock() - for _, n := range s.nodes { + for _, n := range s.dataNodes { a = append(a, n) } - sort.Sort(nodes(a)) + sort.Sort(dataNodes(a)) return } -// CreateNode creates a new node with a given URL. -func (s *Server) CreateNode(u *url.URL) error { - c := &createNodeCommand{URL: u.String()} - _, err := s.broadcast(createNodeMessageType, c) +// CreateDataNode creates a new data node with a given URL. +func (s *Server) CreateDataNode(u *url.URL) error { + c := &createDataNodeCommand{URL: u.String()} + _, err := s.broadcast(createDataNodeMessageType, c) return err } -func (s *Server) applyCreateNode(m *messaging.Message) (err error) { - var c createNodeCommand +func (s *Server) applyCreateDataNode(m *messaging.Message) (err error) { + var c createDataNodeCommand mustUnmarshalJSON(m.Data, &c) s.mu.Lock() @@ -293,63 +296,63 @@ func (s *Server) applyCreateNode(m *messaging.Message) (err error) { // Validate parameters. if c.URL == "" { - return ErrNodeURLRequired + return ErrDataNodeURLRequired } - // Check that another node doesn't already exist. + // Check that another node with the same URL doesn't already exist. u, _ := url.Parse(c.URL) - if s.nodesByURL[u.String()] != nil { - return ErrNodeExists + for _, n := range s.dataNodes { + if n.URL.String() == u.String() { + return ErrDataNodeExists + } } - // Create node. - n := newNode() + // Create data node. + n := newDataNode() n.ID = m.Index n.URL = u // Persist to metastore. - err = s.meta.mustUpdate(func(tx *metatx) error { return tx.saveNode(n) }) + err = s.meta.mustUpdate(func(tx *metatx) error { return tx.saveDataNode(n) }) // Add to node on server. - s.nodes[n.ID] = n - s.nodesByURL[n.URL.String()] = n + s.dataNodes[n.ID] = n return } -type createNodeCommand struct { +type createDataNodeCommand struct { URL string `json:"url"` } -// DeleteNode deletes an existing node. -func (s *Server) DeleteNode(id uint64) error { - c := &deleteNodeCommand{ID: id} - _, err := s.broadcast(deleteNodeMessageType, c) +// DeleteDataNode deletes an existing data node. +func (s *Server) DeleteDataNode(id uint64) error { + c := &deleteDataNodeCommand{ID: id} + _, err := s.broadcast(deleteDataNodeMessageType, c) return err } -func (s *Server) applyDeleteNode(m *messaging.Message) (err error) { - var c deleteNodeCommand +func (s *Server) applyDeleteDataNode(m *messaging.Message) (err error) { + var c deleteDataNodeCommand mustUnmarshalJSON(m.Data, &c) s.mu.Lock() defer s.mu.Unlock() - n := s.nodes[c.ID] + n := s.dataNodes[c.ID] if n == nil { - return ErrNodeNotFound + return ErrDataNodeNotFound } // Remove from metastore. - err = s.meta.mustUpdate(func(tx *metatx) error { return tx.deleteNode(c.ID) }) + err = s.meta.mustUpdate(func(tx *metatx) error { return tx.deleteDataNode(c.ID) }) // Delete the node. - delete(s.nodes, n.ID) - delete(s.nodesByURL, n.URL.String()) + delete(s.dataNodes, n.ID) return } -type deleteNodeCommand struct { +type deleteDataNodeCommand struct { ID uint64 `json:"id"` } @@ -1070,10 +1073,10 @@ func (s *Server) processor(done chan struct{}) { switch m.Type { case writeSeriesMessageType: err = s.applyWriteSeries(m) - case createNodeMessageType: - err = s.applyCreateNode(m) - case deleteNodeMessageType: - err = s.applyDeleteNode(m) + case createDataNodeMessageType: + err = s.applyCreateDataNode(m) + case deleteDataNodeMessageType: + err = s.applyDeleteDataNode(m) case createDatabaseMessageType: err = s.applyCreateDatabase(m) case deleteDatabaseMessageType: @@ -1117,20 +1120,20 @@ type MessagingClient interface { C() <-chan *messaging.Message } -// Node represents a data node in the cluster. -type Node struct { +// DataNode represents a data node in the cluster. +type DataNode struct { ID uint64 URL *url.URL } -// newNode returns an instance of Node. -func newNode() *Node { return &Node{} } +// newDataNode returns an instance of DataNode. +func newDataNode() *DataNode { return &DataNode{} } -type nodes []*Node +type dataNodes []*DataNode -func (p nodes) Len() int { return len(p) } -func (p nodes) Less(i, j int) bool { return p[i].ID < p[j].ID } -func (p nodes) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (p dataNodes) Len() int { return len(p) } +func (p dataNodes) Less(i, j int) bool { return p[i].ID < p[j].ID } +func (p dataNodes) Swap(i, j int) { p[i], p[j] = p[j], p[i] } // database represents a collection of retention policies. type database struct { diff --git a/server_test.go b/server_test.go index 4e6a8c2276..95cd463bfe 100644 --- a/server_test.go +++ b/server_test.go @@ -34,20 +34,20 @@ func TestServer_Open_ErrServerOpen(t *testing.T) { t.Skip("pending") } func TestServer_Open_ErrPathRequired(t *testing.T) { t.Skip("pending") } // Ensure the server can create a new data node. -func TestServer_CreateNode(t *testing.T) { +func TestServer_CreateDataNode(t *testing.T) { s := OpenServer(NewMessagingClient()) defer s.Close() // Create a new node. u, _ := url.Parse("http://localhost:80000") - if err := s.CreateNode(u); err != nil { + if err := s.CreateDataNode(u); err != nil { t.Fatal(err) } s.Restart() // Verify that the node exists. - if n := s.NodeByURL(u); n == nil { - t.Fatalf("node not found") + if n := s.DataNodeByURL(u); n == nil { + t.Fatalf("data node not found") } else if n.URL.String() != "http://localhost:80000" { t.Fatalf("unexpected url: %s", n.URL) } else if n.ID == 0 { @@ -56,40 +56,40 @@ func TestServer_CreateNode(t *testing.T) { } // Ensure the server returns an error when creating a duplicate node. -func TestServer_CreateDatabase_ErrNodeExists(t *testing.T) { +func TestServer_CreateDatabase_ErrDataNodeExists(t *testing.T) { s := OpenServer(NewMessagingClient()) defer s.Close() // Create a node with the same URL twice. u, _ := url.Parse("http://localhost:80000") - if err := s.CreateNode(u); err != nil { + if err := s.CreateDataNode(u); err != nil { t.Fatal(err) } - if err := s.CreateNode(u); err != influxdb.ErrNodeExists { + if err := s.CreateDataNode(u); err != influxdb.ErrDataNodeExists { t.Fatal(err) } } // Ensure the server can delete a node. -func TestServer_DeleteNode(t *testing.T) { +func TestServer_DeleteDataNode(t *testing.T) { s := OpenServer(NewMessagingClient()) defer s.Close() - // Create a node and verify it exists. + // Create a data node and verify it exists. u, _ := url.Parse("http://localhost:80000") - if err := s.CreateNode(u); err != nil { + if err := s.CreateDataNode(u); err != nil { t.Fatal(err) - } else if s.NodeByURL(u) == nil { - t.Fatalf("node not actually created") + } else if s.DataNodeByURL(u) == nil { + t.Fatalf("data node not actually created") } s.Restart() // Drop the node and verify that it's gone. - n := s.NodeByURL(u) - if err := s.DeleteNode(n.ID); err != nil { + n := s.DataNodeByURL(u) + if err := s.DeleteDataNode(n.ID); err != nil { t.Fatal(err) - } else if s.Node(n.ID) != nil { - t.Fatalf("node not actually dropped") + } else if s.DataNode(n.ID) != nil { + t.Fatalf("data node not actually dropped") } } From ebb82a91fd3e4c499bb53af4918db28532a4e13f Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 30 Dec 2014 08:53:07 -0700 Subject: [PATCH 3/5] Remove data node sort in the metastore. --- metastore.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/metastore.go b/metastore.go index b26325c80d..d07368fc96 100644 --- a/metastore.go +++ b/metastore.go @@ -103,7 +103,7 @@ func (tx *metatx) setID(v uint64) error { return tx.Bucket([]byte("Server")).Put([]byte("id"), u64tob(v)) } -// databases returns a list of all data nodes from the metastore. +// dataNodes returns a list of all data nodes from the metastore. func (tx *metatx) dataNodes() (a []*DataNode) { c := tx.Bucket([]byte("DataNodes")).Cursor() for k, v := c.First(); k != nil; k, v = c.Next() { @@ -111,7 +111,6 @@ func (tx *metatx) dataNodes() (a []*DataNode) { mustUnmarshalJSON(v, &n) a = append(a, n) } - sort.Sort(dataNodes(a)) return } From 957ea91997d80713282afcf4add5e2e6dbb4c647 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 30 Dec 2014 09:01:19 -0700 Subject: [PATCH 4/5] Add DataNode autoincrementing sequence. --- metastore.go | 6 ++++++ server.go | 6 ++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/metastore.go b/metastore.go index d07368fc96..394d80a3a7 100644 --- a/metastore.go +++ b/metastore.go @@ -114,6 +114,12 @@ func (tx *metatx) dataNodes() (a []*DataNode) { return } +// nextDataNodeID returns a autoincrementing id. +func (tx *metatx) nextDataNodeID() uint64 { + id, _ := tx.Bucket([]byte("DataNodes")).NextSequence() + return id +} + // saveDataNode persists a data node to the metastore. func (tx *metatx) saveDataNode(n *DataNode) error { return tx.Bucket([]byte("DataNodes")).Put(u64tob(n.ID), mustMarshalJSON(n)) diff --git a/server.go b/server.go index 3dfe4b16bb..70c9010d9f 100644 --- a/server.go +++ b/server.go @@ -309,11 +309,13 @@ func (s *Server) applyCreateDataNode(m *messaging.Message) (err error) { // Create data node. n := newDataNode() - n.ID = m.Index n.URL = u // Persist to metastore. - err = s.meta.mustUpdate(func(tx *metatx) error { return tx.saveDataNode(n) }) + err = s.meta.mustUpdate(func(tx *metatx) error { + n.ID = tx.nextDataNodeID() + return tx.saveDataNode(n) + }) // Add to node on server. s.dataNodes[n.ID] = n From bb7cdfb2af28200c1ea3bc33da68c6fc58bc358e Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 30 Dec 2014 10:20:45 -0700 Subject: [PATCH 5/5] Reanme Shard.nodeIDs to Shard.dataNodeIDs. --- shard.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/shard.go b/shard.go index a292cb4727..7c98a4b064 100644 --- a/shard.go +++ b/shard.go @@ -16,8 +16,8 @@ type Shard struct { StartTime time.Time `json:"startTime,omitempty"` EndTime time.Time `json:"endTime,omitempty"` - replicaN []uint64 // replication factor - nodeIDs []uint64 // owner nodes + replicaN []uint64 // replication factor + dataNodeIDs []uint64 // owner nodes store *bolt.DB }