Wire up meta service functionality

* Add dir, hostname, and bind address to top level config since it applies to services other than meta
* Add enabled flags to example toml for data and meta services
* Wire up add/remove raft peers and meta servers to meta service
* Update DROP SERVER to be either DROP META SERVER or DROP DATA SERVER
* Bring over statement executor from old meta package
* Start meta service client implementation
* Update meta service test to use the client
* Wire up node ID/meta server storage information
pull/5428/head
Paul Dix 2015-12-30 08:15:00 -05:00 committed by David Norton
parent 688bc7a2f1
commit c9d82ad0ad
24 changed files with 3029 additions and 955 deletions

View File

@ -100,7 +100,7 @@ func (cmd *Command) Run(args ...string) error {
}
if options.Join != "" {
config.Meta.Peers = strings.Split(options.Join, ",")
config.Meta.JoinPeers = strings.Split(options.Join, ",")
}
// Validate the configuration.

View File

@ -45,13 +45,20 @@ type Config struct {
OpenTSDB opentsdb.Config `toml:"opentsdb"`
UDPs []udp.Config `toml:"udp"`
// Snapshot SnapshotConfig `toml:"snapshot"`
ContinuousQuery continuous_querier.Config `toml:"continuous_queries"`
HintedHandoff hh.Config `toml:"hinted-handoff"`
HintedHandoff hh.Config `toml:"hinted-handoff"`
// Server reporting
ReportingDisabled bool `toml:"reporting-disabled"`
Dir string `toml:"dir"`
// BindAddress is the address that all TCP services use (Raft, Snapshot, Cluster, etc.)
BindAddress `toml:"bind-address"`
// Hostname is the resolvable name for other servers in
// the cluster to reach this server
Hostname string `toml:"hostname"`
}
// NewConfig returns an instance of Config with reasonable defaults.

View File

@ -87,15 +87,20 @@ type Server struct {
// NewServer returns a new instance of Server built from a config.
func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
node, err := influxdb.NewNode(c.Dir)
if err != nil {
return nil, err
}
s := &Server{
buildInfo: *buildInfo,
err: make(chan error),
closing: make(chan struct{}),
Hostname: c.Meta.Hostname,
BindAddress: c.Meta.RaftBindAddress,
BindAddress: c.Meta.BindAddress,
Node: influxdb.NewNode(),
Node: node,
MetaClient: meta.NewClient(c.Meta),
Monitor: monitor.New(c.Monitor),
@ -364,6 +369,7 @@ func (s *Server) Open() error {
// Multiplex listener.
mux := tcp.NewMux()
s.MetaService.RaftListener = mux.Listen(meta.MuxHeader)
s.ClusterService.Listener = mux.Listen(cluster.MuxHeader)
s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)
s.CopierService.Listener = mux.Listen(copier.MuxHeader)

View File

@ -8,6 +8,14 @@
# Change this option to true to disable reporting.
reporting-disabled = false
# directory where server ID and cluster metaservers information will be kept
dir = "/var/lib/influxdb"
# we'll try to get the hostname automatically, but if it the os returns something
# that isn't resolvable by other servers in the cluster, use this option to
# manually set the hostname
# hostname = "localhost"
###
### [meta]
###
@ -16,8 +24,12 @@ reporting-disabled = false
###
[meta]
# Controls if this node should run the metaservice and participate in the Raft group
enabled = true
# Where the metadata/raft database is stored
dir = "/var/lib/influxdb/meta"
hostname = "localhost"
bind-address = ":8088"
retention-autocreate = true
election-timeout = "1s"
@ -26,13 +38,6 @@ reporting-disabled = false
commit-timeout = "50ms"
cluster-tracing = false
# If enabled, when a Raft cluster loses a peer due to a `DROP SERVER` command,
# the leader will automatically ask a non-raft peer node to promote to a raft
# peer. This only happens if there is a non-raft peer node available to promote.
# This setting only affects the local node, so to ensure if operates correctly, be sure to set
# it in the config of every node.
raft-promotion-enabled = true
###
### [data]
###
@ -43,6 +48,9 @@ reporting-disabled = false
###
[data]
# Controls if this node holds time series data shards in the cluster
enabled = true
dir = "/var/lib/influxdb/data"
# The following WAL settings are for the b1 storage engine used in 0.9.2. They won't

View File

@ -1884,18 +1884,21 @@ func (s DropSeriesStatement) RequiredPrivileges() ExecutionPrivileges {
type DropServerStatement struct {
// ID of the node to be dropped.
NodeID uint64
// Force will force the server to drop even it it means losing data
Force bool
// Meta indicates if the server being dropped is a meta or data node
Meta bool
}
// String returns a string representation of the drop series statement.
func (s *DropServerStatement) String() string {
var buf bytes.Buffer
_, _ = buf.WriteString("DROP SERVER ")
_, _ = buf.WriteString(strconv.FormatUint(s.NodeID, 10))
if s.Force {
_, _ = buf.WriteString(" FORCE")
_, _ = buf.WriteString("DROP ")
if s.Meta {
_, _ = buf.WriteString(" META SERVER ")
} else {
_, _ = buf.WriteString(" DATA SERVER ")
}
_, _ = buf.WriteString(strconv.FormatUint(s.NodeID, 10))
return buf.String()
}

View File

@ -220,8 +220,8 @@ func (p *Parser) parseDropStatement() (Statement, error) {
return p.parseDropRetentionPolicyStatement()
} else if tok == USER {
return p.parseDropUserStatement()
} else if tok == SERVER {
return p.parseDropServerStatement()
} else if tok == META || tok == DATA {
return p.parseDropServerStatement(tok)
} else if tok == SUBSCRIPTION {
return p.parseDropSubscriptionStatement()
}
@ -1308,23 +1308,25 @@ func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) {
}
// parseDropServerStatement parses a string and returns a DropServerStatement.
// This function assumes the "DROP SERVER" tokens have already been consumed.
func (p *Parser) parseDropServerStatement() (*DropServerStatement, error) {
// This function assumes the "DROP <META|DATA>" tokens have already been consumed.
func (p *Parser) parseDropServerStatement(tok Token) (*DropServerStatement, error) {
// Parse the SERVER token
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != SERVER {
return nil, newParseError(tokstr(tok, lit), []string{"SERVER"}, pos)
}
s := &DropServerStatement{}
var err error
if tok == META {
s.Meta = true
}
// Parse the server's ID.
if s.NodeID, err = p.parseUInt64(); err != nil {
return nil, err
}
// Parse optional FORCE token.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok == FORCE {
s.Force = true
} else if tok != EOF && tok != SEMICOLON {
return nil, newParseError(tokstr(tok, lit), []string{"FORCE"}, pos)
}
return s, nil
}

View File

@ -1035,12 +1035,12 @@ func TestParser_ParseStatement(t *testing.T) {
// DROP SERVER statement
{
s: `DROP SERVER 123`,
stmt: &influxql.DropServerStatement{NodeID: 123},
s: `DROP META SERVER 123`,
stmt: &influxql.DropServerStatement{NodeID: 123, Meta: true},
},
{
s: `DROP SERVER 123 FORCE`,
stmt: &influxql.DropServerStatement{NodeID: 123, Force: true},
s: `DROP DATA SERVER 123`,
stmt: &influxql.DropServerStatement{NodeID: 123, Meta: false},
},
// SHOW CONTINUOUS QUERIES statement
@ -1743,9 +1743,8 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `DROP SERIES`, err: `found EOF, expected FROM, WHERE at line 1, char 13`},
{s: `DROP SERIES FROM`, err: `found EOF, expected identifier at line 1, char 18`},
{s: `DROP SERIES FROM src WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`},
{s: `DROP SERVER`, err: `found EOF, expected number at line 1, char 13`},
{s: `DROP SERVER abc`, err: `found abc, expected number at line 1, char 13`},
{s: `DROP SERVER 1 1`, err: `found 1, expected FORCE at line 1, char 15`},
{s: `DROP META SERVER`, err: `found EOF, expected number at line 1, char 18`},
{s: `DROP DATA SERVER abc`, err: `found abc, expected number at line 1, char 18`},
{s: `SHOW CONTINUOUS`, err: `found EOF, expected QUERIES at line 1, char 17`},
{s: `SHOW RETENTION`, err: `found EOF, expected POLICIES at line 1, char 16`},
{s: `SHOW RETENTION ON`, err: `found ON, expected POLICIES at line 1, char 16`},

View File

@ -65,6 +65,7 @@ const (
BY
CREATE
CONTINUOUS
DATA
DATABASE
DATABASES
DEFAULT
@ -96,6 +97,7 @@ const (
KEY
KEYS
LIMIT
META
MEASUREMENT
MEASUREMENTS
NAME
@ -186,6 +188,7 @@ var tokens = [...]string{
BY: "BY",
CREATE: "CREATE",
CONTINUOUS: "CONTINUOUS",
DATA: "DATA",
DATABASE: "DATABASE",
DATABASES: "DATABASES",
DEFAULT: "DEFAULT",
@ -219,6 +222,7 @@ var tokens = [...]string{
LIMIT: "LIMIT",
MEASUREMENT: "MEASUREMENT",
MEASUREMENTS: "MEASUREMENTS",
META: "META",
NAME: "NAME",
NOT: "NOT",
OFFSET: "OFFSET",

48
node.go
View File

@ -1,19 +1,47 @@
package influxdb
import "sync"
import (
"encoding/json"
"os"
"path/filepath"
)
const nodeFile = "node.json"
type Node struct {
mu sync.RWMutex
id uint64
path string
ID uint64
MetaServers []string
}
func NewNode() *Node {
// TODO: (@corylanou): make this load the id properly
return &Node{}
// NewNode will load the node information from disk if present
func NewNode(path string) (*Node, error) {
n := &Node{
path: path,
}
f, err := os.Open(filepath.Join(path, nodeFile))
if err != nil && !os.IsNotExist(err) {
return nil, err
}
defer f.Close()
if err := json.NewDecoder(f).Decode(n); err != nil {
return nil, err
}
return n, nil
}
func (n *Node) ID() uint64 {
n.mu.RLock()
defer n.mu.RUnlock()
return n.id
// Save will save the node file to disk and replace the existing one if present
func (n *Node) Save() error {
tmpFile := filepath.Join(n.path, nodeFile, "tmp")
f, err := os.Open(tmpFile)
if err != nil {
return err
}
defer f.Close()
return json.NewEncoder(f).Encode(n)
}

View File

@ -1,16 +1,66 @@
package meta
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"sync"
"time"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/services/meta/internal"
"github.com/gogo/protobuf/proto"
)
const errSleep = 10 * time.Millisecond
type Client struct {
tls bool
logger *log.Logger
mu sync.RWMutex
metaServers []string
changed chan struct{}
closing chan struct{}
data *Data
executor *StatementExecutor
}
func NewClient(c *Config) *Client {
return &Client{}
func NewClient(metaServers []string, tls bool) *Client {
client := &Client{
data: &Data{},
metaServers: metaServers,
tls: tls,
logger: log.New(os.Stderr, "[metaclient] ", log.LstdFlags),
}
client.executor = &StatementExecutor{Store: client}
return client
}
func (c *Client) Open() error {
c.changed = make(chan struct{})
c.closing = make(chan struct{})
c.data = c.retryUntilSnapshot(0)
go c.pollForUpdates()
return nil
}
func (c *Client) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
close(c.closing)
return nil
}
func (c *Client) ClusterID() (id uint64, err error) {
@ -18,12 +68,48 @@ func (c *Client) ClusterID() (id uint64, err error) {
}
// Node returns a node by id.
func (c *Client) Node(id uint64) (*NodeInfo, error) {
func (c *Client) DataNode(id uint64) (*NodeInfo, error) {
return nil, nil
}
func (c *Client) DataNodes() ([]NodeInfo, error) {
c.mu.RLock()
defer c.mu.RUnlock()
return c.data.DataNodes, nil
}
func (c *Client) DeleteDataNode(nodeID uint64) error {
return nil
}
func (c *Client) MetaNodes() ([]NodeInfo, error) {
c.mu.RLock()
defer c.mu.RUnlock()
return c.data.MetaNodes, nil
}
func (c *Client) MetaNodeByAddr(addr string) *NodeInfo {
c.mu.RLock()
defer c.mu.RUnlock()
for _, n := range c.data.MetaNodes {
if n.Host == addr {
return &n
}
}
return nil
}
func (c *Client) Database(name string) (*DatabaseInfo, error) {
return nil, nil
c.mu.RLock()
defer c.mu.RUnlock()
for _, d := range c.data.Databases {
if d.Name == name {
return &d, nil
}
}
return nil, influxdb.ErrDatabaseNotFound(name)
}
func (c *Client) Databases() ([]DatabaseInfo, error) {
@ -31,6 +117,27 @@ func (c *Client) Databases() ([]DatabaseInfo, error) {
}
func (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) {
cmd := &internal.CreateDatabaseCommand{
Name: proto.String(name),
}
err := c.retryUntilExec(internal.Command_CreateDatabaseCommand, internal.E_CreateDatabaseCommand_Command, cmd)
if err != nil {
return nil, err
}
return c.Database(name)
}
func (c *Client) CreateDatabaseWithRetentionPolicy(name string, rpi *RetentionPolicyInfo) (*DatabaseInfo, error) {
return nil, nil
}
func (c *Client) DropDatabase(name string) error {
return nil
}
func (c *Client) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo) (*RetentionPolicyInfo, error) {
return nil, nil
}
@ -50,6 +157,10 @@ func (c *Client) SetDefaultRetentionPolicy(database, name string) error {
return nil
}
func (c *Client) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error {
return nil
}
func (c *Client) IsLeader() bool {
return false
}
@ -66,6 +177,34 @@ func (c *Client) User(name string) (*UserInfo, error) {
return nil, nil
}
func (c *Client) CreateUser(name, password string, admin bool) (*UserInfo, error) {
return nil, nil
}
func (c *Client) UpdateUser(name, password string) error {
return nil
}
func (c *Client) DropUser(name string) error {
return nil
}
func (c *Client) SetPrivilege(username, database string, p influxql.Privilege) error {
return nil
}
func (c *Client) SetAdminPrivilege(username string, admin bool) error {
return nil
}
func (c *Client) UserPrivileges(username string) (map[string]influxql.Privilege, error) {
return nil, nil
}
func (c *Client) UserPrivilege(username, database string) (*influxql.Privilege, error) {
return nil, nil
}
func (c *Client) AdminUserExists() (bool, error) {
return false, nil
}
@ -106,14 +245,259 @@ func (c *Client) ShardOwner(shardID uint64) (string, string, *ShardGroupInfo) {
return "", "", nil
}
func (c *Client) ExecuteStatement(stmt influxql.Statement) *influxql.Result {
// JoinMetaServer will add the passed in tcpAddr to the raft peers and add a MetaNode to
// the metastore
func (c *Client) JoinMetaServer(httpAddr, tcpAddr string) error {
node := &NodeInfo{
Host: httpAddr,
TCPHost: tcpAddr,
}
b, err := json.Marshal(node)
if err != nil {
return err
}
currentServer := 0
redirectServer := ""
for {
// get the server to try to join against
var url string
if redirectServer != "" {
url = redirectServer
} else {
c.mu.RLock()
if currentServer >= len(c.metaServers) {
currentServer = 0
}
server := c.metaServers[currentServer]
c.mu.RUnlock()
url = c.url(server) + "/join"
}
resp, err := http.Post(url, "application/json", bytes.NewBuffer(b))
if err != nil {
currentServer++
continue
}
resp.Body.Close()
if resp.StatusCode == http.StatusTemporaryRedirect {
redirectServer = resp.Header.Get("Location")
continue
}
return nil
}
}
func (c *Client) CreateMetaNode(httpAddr, tcpAddr string) error {
cmd := &internal.CreateMetaNodeCommand{
HTTPAddr: proto.String(httpAddr),
TCPAddr: proto.String(tcpAddr),
}
return c.retryUntilExec(internal.Command_CreateMetaNodeCommand, internal.E_CreateMetaNodeCommand_Command, cmd)
}
func (c *Client) DeleteMetaNode(id uint64) error {
cmd := &internal.DeleteMetaNodeCommand{
ID: proto.Uint64(id),
}
return c.retryUntilExec(internal.Command_DeleteMetaNodeCommand, internal.E_DeleteMetaNodeCommand_Command, cmd)
}
func (c *Client) CreateContinuousQuery(database, name, query string) error {
return nil
}
func (c *Client) WaitForDataChanged() error {
func (c *Client) DropContinuousQuery(database, name string) error {
return nil
}
func (c *Client) CreateSubscription(database, rp, name, mode string, destinations []string) error {
return nil
}
func (c *Client) DropSubscription(database, rp, name string) error {
return nil
}
func (c *Client) ExecuteStatement(stmt influxql.Statement) *influxql.Result {
return c.executor.ExecuteStatement(stmt)
}
// WaitForDataChanged will return a channel that will get closed when
// the metastore data has changed
func (c *Client) WaitForDataChanged() chan struct{} {
c.mu.RLock()
defer c.mu.RUnlock()
return c.changed
}
func (c *Client) MarshalBinary() ([]byte, error) {
return nil, nil
}
func (c *Client) MetaServers() []string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.metaServers
}
func (c *Client) index() uint64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.data.Index
}
// retryUntilExec will attempt the command on each of the metaservers and return on the first success
func (c *Client) retryUntilExec(typ internal.Command_Type, desc *proto.ExtensionDesc, value interface{}) error {
var err error
var index uint64
for _, s := range c.MetaServers() {
index, err = c.exec(s, typ, desc, value)
if err == nil {
c.waitForIndex(index)
return nil
}
}
return err
}
func (c *Client) exec(addr string, typ internal.Command_Type, desc *proto.ExtensionDesc, value interface{}) (index uint64, err error) {
// Create command.
cmd := &internal.Command{Type: &typ}
if err := proto.SetExtension(cmd, desc, value); err != nil {
panic(err)
}
b, err := proto.Marshal(cmd)
if err != nil {
return 0, err
}
// execute against the metaserver
url := fmt.Sprintf("://%s/execute", addr)
if c.tls {
url = "https" + url
} else {
url = "http" + url
}
resp, err := http.Post(url, "application/octet-stream", bytes.NewBuffer(b))
if err != nil {
return 0, err
}
defer resp.Body.Close()
// read the response
if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("unexpected result:\n\texp: %d\n\tgot: %d\n", http.StatusOK, resp.StatusCode)
}
res := &internal.Response{}
b, err = ioutil.ReadAll(resp.Body)
if err != nil {
return 0, err
}
if err := proto.Unmarshal(b, res); err != nil {
return 0, err
}
es := res.GetError()
if es != "" {
return 0, fmt.Errorf("exec err: %s", es)
}
return res.GetIndex(), nil
}
func (c *Client) waitForIndex(idx uint64) {
for {
c.mu.RLock()
if c.data.Index >= idx {
c.mu.RUnlock()
return
}
ch := c.changed
c.mu.RUnlock()
<-ch
}
}
func (c *Client) pollForUpdates() {
for {
data := c.retryUntilSnapshot(c.index())
// update the data and notify of the change
c.mu.Lock()
idx := c.data.Index
c.data = data
if idx < data.Index {
close(c.changed)
c.changed = make(chan struct{})
}
c.mu.Unlock()
}
}
func (c *Client) getSnapshot(server string, index uint64) (*Data, error) {
resp, err := http.Get(c.url(server) + fmt.Sprintf("?index=%d", index))
if err != nil {
return nil, err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
data := &Data{}
if err := data.UnmarshalBinary(b); err != nil {
return nil, err
}
return data, nil
}
func (c *Client) url(server string) string {
url := fmt.Sprintf("://%s", server)
if c.tls {
url = "https" + url
} else {
url = "http" + url
}
return url
}
func (c *Client) retryUntilSnapshot(idx uint64) *Data {
currentServer := 0
for {
// get the index to look from and the server to poll
c.mu.RLock()
if currentServer >= len(c.metaServers) {
currentServer = 0
}
server := c.metaServers[currentServer]
c.mu.RUnlock()
data, err := c.getSnapshot(server, idx)
if err == nil {
return data
}
c.logger.Printf("failure getting snapshot from %s: %s", server, err.Error())
currentServer += 1
time.Sleep(errSleep)
continue
}
}

View File

@ -17,8 +17,8 @@ const (
// DefaultRaftBindAddress is the default address to bind to.
DefaultRaftBindAddress = ":8088"
// DefaultHTTPdBindAddress is the default address to bind to.
DefaultHTTPdBindAddress = ":8091"
// DefaultHTTPBindAddress is the default address to bind the API to.
DefaultHTTPBindAddress = ":8091"
// DefaultHeartbeatTimeout is the default heartbeat timeout for the store.
DefaultHeartbeatTimeout = 1000 * time.Millisecond
@ -41,14 +41,20 @@ const (
// Config represents the meta configuration.
type Config struct {
Enabled bool `toml:"enabled"`
Dir string `toml:"dir"`
Hostname string `toml:"hostname"`
RaftBindAddress string `toml:"raft-bind-address"`
HTTPdBindAddress string `toml:"httpd-bind-address"`
HTTPSEnabled bool `toml:"https-enabled"`
HTTPSCertificate string `toml:"https-certificate"`
Peers []string `toml:"-"`
Enabled bool `toml:"enabled"`
Dir string `toml:"dir"`
Hostname string `toml:"hostname"`
// this is deprecated. Should use the address from run/config.go
BindAddress string `toml:"bind-address"`
// HTTPBindAddress is the bind address for the metaservice HTTP API
HTTPBindAddress string `toml:"http-bind-address"`
HTTPSEnabled bool `toml:"https-enabled"`
HTTPSCertificate string `toml:"https-certificate"`
// JoinPeers if specified gives other metastore servers to join this server to the cluster
JoinPeers []string `toml:"-"`
RetentionAutoCreate bool `toml:"retention-autocreate"`
ElectionTimeout toml.Duration `toml:"election-timeout"`
HeartbeatTimeout toml.Duration `toml:"heartbeat-timeout"`
@ -63,9 +69,10 @@ type Config struct {
// NewConfig builds a new configuration with default values.
func NewConfig() *Config {
return &Config{
Enabled: true, // enabled by default
Hostname: DefaultHostname,
RaftBindAddress: DefaultRaftBindAddress,
HTTPdBindAddress: DefaultHTTPdBindAddress,
BindAddress: DefaultRaftBindAddress,
HTTPBindAddress: DefaultHTTPBindAddress,
RetentionAutoCreate: true,
ElectionTimeout: toml.Duration(DefaultElectionTimeout),
HeartbeatTimeout: toml.Duration(DefaultHeartbeatTimeout),

View File

@ -5,7 +5,7 @@ import (
"time"
"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/services/meta"
)
func TestConfig_Parse(t *testing.T) {

View File

@ -28,7 +28,8 @@ type Data struct {
Term uint64 // associated raft term
Index uint64 // associated raft index
ClusterID uint64
Nodes []NodeInfo
MetaNodes []NodeInfo
DataNodes []NodeInfo
Databases []DatabaseInfo
Users []UserInfo
@ -37,36 +38,28 @@ type Data struct {
MaxShardID uint64
}
// Node returns a node by id.
func (data *Data) Node(id uint64) *NodeInfo {
for i := range data.Nodes {
if data.Nodes[i].ID == id {
return &data.Nodes[i]
// DataNode returns a node by id.
func (data *Data) DataNode(id uint64) *NodeInfo {
for i := range data.DataNodes {
if data.DataNodes[i].ID == id {
return &data.DataNodes[i]
}
}
return nil
}
// NodeByHost returns a node by hostname.
func (data *Data) NodeByHost(host string) *NodeInfo {
for i := range data.Nodes {
if data.Nodes[i].Host == host {
return &data.Nodes[i]
}
}
return nil
}
// CreateNode adds a node to the metadata.
func (data *Data) CreateNode(host string) error {
// CreateDataNode adds a node to the metadata.
func (data *Data) CreateDataNode(host string) error {
// Ensure a node with the same host doesn't already exist.
if data.NodeByHost(host) != nil {
return ErrNodeExists
for _, n := range data.DataNodes {
if n.Host == host {
return ErrNodeExists
}
}
// Append new node.
data.MaxNodeID++
data.Nodes = append(data.Nodes, NodeInfo{
data.DataNodes = append(data.DataNodes, NodeInfo{
ID: data.MaxNodeID,
Host: host,
})
@ -75,40 +68,11 @@ func (data *Data) CreateNode(host string) error {
}
// DeleteNode removes a node from the metadata.
func (data *Data) DeleteNode(id uint64, force bool) error {
func (data *Data) DeleteDataNode(id uint64) error {
// Node has to be larger than 0 to be real
if id == 0 {
return ErrNodeIDRequired
}
// Is this a valid node?
nodeInfo := data.Node(id)
if nodeInfo == nil {
return ErrNodeNotFound
}
// Am I the only node? If so, nothing to do
if len(data.Nodes) == 1 {
return ErrNodeUnableToDropFinalNode
}
// Determine if there are any any non-replicated nodes and force was not specified
if !force {
for _, d := range data.Databases {
for _, rp := range d.RetentionPolicies {
// ignore replicated retention policies
if rp.ReplicaN > 1 {
continue
}
for _, sg := range rp.ShardGroups {
for _, s := range sg.Shards {
if s.OwnedBy(id) && len(s.Owners) == 1 {
return ErrShardNotReplicated
}
}
}
}
}
}
// Remove node id from all shard infos
for di, d := range data.Databases {
@ -131,14 +95,71 @@ func (data *Data) DeleteNode(id uint64, force bool) error {
// Remove this node from the in memory nodes
var nodes []NodeInfo
for _, n := range data.Nodes {
for _, n := range data.DataNodes {
if n.ID == id {
continue
}
nodes = append(nodes, n)
}
data.Nodes = nodes
if len(nodes) == len(data.DataNodes) {
return ErrNodeNotFound
}
data.DataNodes = nodes
return nil
}
// MetaNode returns a node by id.
func (data *Data) MetaNode(id uint64) *NodeInfo {
for i := range data.MetaNodes {
if data.MetaNodes[i].ID == id {
return &data.MetaNodes[i]
}
}
return nil
}
// CreateMetaNode will add a new meta node to the metastore
func (data *Data) CreateMetaNode(httpAddr, tcpAddr string) error {
// Ensure a node with the same host doesn't already exist.
for _, n := range data.MetaNodes {
if n.Host == httpAddr {
return ErrNodeExists
}
}
// Append new node.
data.MaxNodeID++
data.MetaNodes = append(data.MetaNodes, NodeInfo{
ID: data.MaxNodeID,
Host: httpAddr,
TCPHost: tcpAddr,
})
return nil
}
// DeleteMetaNode will remove the meta node from the store
func (data *Data) DeleteMetaNode(id uint64) error {
// Node has to be larger than 0 to be real
if id == 0 {
return ErrNodeIDRequired
}
var nodes []NodeInfo
for _, n := range data.MetaNodes {
if n.ID == id {
continue
}
nodes = append(nodes, n)
}
if len(nodes) == len(data.MetaNodes) {
return ErrNodeNotFound
}
data.MetaNodes = nodes
return nil
}
@ -355,7 +376,7 @@ func (data *Data) ShardGroupByTimestamp(database, policy string, timestamp time.
// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) error {
// Ensure there are nodes in the metadata.
if len(data.Nodes) == 0 {
if len(data.DataNodes) == 0 {
return ErrNodesRequired
}
@ -376,14 +397,14 @@ func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time)
replicaN := rpi.ReplicaN
if replicaN == 0 {
replicaN = 1
} else if replicaN > len(data.Nodes) {
replicaN = len(data.Nodes)
} else if replicaN > len(data.DataNodes) {
replicaN = len(data.DataNodes)
}
// Determine shard count by node count divided by replication factor.
// This will ensure nodes will get distributed across nodes evenly and
// replicated the correct number of times.
shardN := len(data.Nodes) / replicaN
shardN := len(data.DataNodes) / replicaN
// Create the shard group.
data.MaxShardGroupID++
@ -401,11 +422,11 @@ func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time)
// Assign data nodes to shards via round robin.
// Start from a repeatably "random" place in the node list.
nodeIndex := int(data.Index % uint64(len(data.Nodes)))
nodeIndex := int(data.Index % uint64(len(data.DataNodes)))
for i := range sgi.Shards {
si := &sgi.Shards[i]
for j := 0; j < replicaN; j++ {
nodeID := data.Nodes[nodeIndex%len(data.Nodes)].ID
nodeID := data.DataNodes[nodeIndex%len(data.DataNodes)].ID
si.Owners = append(si.Owners, ShardOwner{NodeID: nodeID})
nodeIndex++
}
@ -632,10 +653,17 @@ func (data *Data) Clone() *Data {
other := *data
// Copy nodes.
if data.Nodes != nil {
other.Nodes = make([]NodeInfo, len(data.Nodes))
for i := range data.Nodes {
other.Nodes[i] = data.Nodes[i].clone()
if data.DataNodes != nil {
other.DataNodes = make([]NodeInfo, len(data.DataNodes))
for i := range data.DataNodes {
other.DataNodes[i] = data.DataNodes[i].clone()
}
}
if data.MetaNodes != nil {
other.MetaNodes = make([]NodeInfo, len(data.MetaNodes))
for i := range data.MetaNodes {
other.MetaNodes[i] = data.MetaNodes[i].clone()
}
}
@ -670,9 +698,14 @@ func (data *Data) marshal() *internal.Data {
MaxShardID: proto.Uint64(data.MaxShardID),
}
pb.Nodes = make([]*internal.NodeInfo, len(data.Nodes))
for i := range data.Nodes {
pb.Nodes[i] = data.Nodes[i].marshal()
pb.DataNodes = make([]*internal.NodeInfo, len(data.DataNodes))
for i := range data.DataNodes {
pb.DataNodes[i] = data.DataNodes[i].marshal()
}
pb.MetaNodes = make([]*internal.NodeInfo, len(data.MetaNodes))
for i := range data.MetaNodes {
pb.MetaNodes[i] = data.MetaNodes[i].marshal()
}
pb.Databases = make([]*internal.DatabaseInfo, len(data.Databases))
@ -698,9 +731,22 @@ func (data *Data) unmarshal(pb *internal.Data) {
data.MaxShardGroupID = pb.GetMaxShardGroupID()
data.MaxShardID = pb.GetMaxShardID()
data.Nodes = make([]NodeInfo, len(pb.GetNodes()))
for i, x := range pb.GetNodes() {
data.Nodes[i].unmarshal(x)
// TODO: Nodes is deprecated. This is being left here to make migration from 0.9.x to 0.10.0 possible
if len(pb.GetNodes()) > 0 {
data.DataNodes = make([]NodeInfo, len(pb.GetNodes()))
for i, x := range pb.GetNodes() {
data.DataNodes[i].unmarshal(x)
}
} else {
data.DataNodes = make([]NodeInfo, len(pb.GetDataNodes()))
for i, x := range pb.GetDataNodes() {
data.DataNodes[i].unmarshal(x)
}
}
data.MetaNodes = make([]NodeInfo, len(pb.GetMetaNodes()))
for i, x := range pb.GetMetaNodes() {
data.MetaNodes[i].unmarshal(x)
}
data.Databases = make([]DatabaseInfo, len(pb.GetDatabases()))
@ -731,8 +777,9 @@ func (data *Data) UnmarshalBinary(buf []byte) error {
// NodeInfo represents information about a single node in the cluster.
type NodeInfo struct {
ID uint64
Host string
ID uint64
Host string
TCPHost string
}
// clone returns a deep copy of ni.
@ -743,6 +790,7 @@ func (ni NodeInfo) marshal() *internal.NodeInfo {
pb := &internal.NodeInfo{}
pb.ID = proto.Uint64(ni.ID)
pb.Host = proto.String(ni.Host)
pb.TCPHost = proto.String(ni.TCPHost)
return pb
}
@ -750,6 +798,7 @@ func (ni NodeInfo) marshal() *internal.NodeInfo {
func (ni *NodeInfo) unmarshal(pb *internal.NodeInfo) {
ni.ID = pb.GetID()
ni.Host = pb.GetHost()
ni.TCPHost = pb.GetTCPHost()
}
// NodeInfos is a slice of NodeInfo used for sorting

View File

@ -2,6 +2,7 @@ package meta
import (
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"io"
@ -19,10 +20,6 @@ import (
"github.com/influxdb/influxdb/uuid"
)
// execMagic is the first 4 bytes sent to a remote exec connection to verify
// that it is coming from a remote exec client connection.
const execMagic = "EXEC"
// handler represents an HTTP handler for the meta service.
type handler struct {
config *Config
@ -34,10 +31,11 @@ type handler struct {
store interface {
afterIndex(index uint64) <-chan struct{}
index() uint64
isLeader() bool
leader() string
leaderHTTP() string
snapshot() (*Data, error)
apply(b []byte) error
join(n *NodeInfo) error
}
}
@ -90,6 +88,41 @@ func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) {
return
}
fmt.Println("serveExec: ", r.Host)
if r.URL.Path == "/join" {
n := &NodeInfo{}
if err := json.Unmarshal(body, n); err != nil {
h.httpError(err, w, http.StatusInternalServerError)
return
}
err := h.store.join(n)
if err == raft.ErrNotLeader {
l := h.store.leaderHTTP()
fmt.Println("redirecting to leader:", l)
if l == "" {
// No cluster leader. Client will have to try again later.
h.httpError(errors.New("no leader"), w, http.StatusServiceUnavailable)
return
}
scheme := "http://"
if h.config.HTTPSEnabled {
scheme = "https://"
}
l = scheme + l + "/join"
fmt.Println("FOO: ", l)
http.Redirect(w, r, l, http.StatusTemporaryRedirect)
return
}
if err != nil {
h.httpError(err, w, http.StatusInternalServerError)
return
}
return
}
// Make sure it's a valid command.
if err := validateCommand(body); err != nil {
h.httpError(err, w, http.StatusBadRequest)
@ -101,14 +134,20 @@ func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) {
if err := h.store.apply(body); err != nil {
// If we aren't the leader, redirect client to the leader.
if err == raft.ErrNotLeader {
l := h.store.leader()
l := h.store.leaderHTTP()
if l == "" {
// No cluster leader. Client will have to try again later.
h.httpError(errors.New("no leader"), w, http.StatusServiceUnavailable)
return
}
l = r.URL.Scheme + "//" + l + "/execute"
http.Redirect(w, r, l, http.StatusFound)
scheme := "http://"
if h.config.HTTPSEnabled {
scheme = "https://"
}
l = scheme + l + "/execute"
fmt.Println("FOO: ", l)
http.Redirect(w, r, l, http.StatusTemporaryRedirect)
return
}

View File

@ -43,15 +43,12 @@ It has these top-level messages:
CreateSubscriptionCommand
DropSubscriptionCommand
RemovePeerCommand
CreateMetaNodeCommand
CreateDataNodeCommand
UpdateDataNodeCommand
DeleteMetaNodeCommand
DeleteDataNodeCommand
Response
ResponseHeader
ErrorResponse
FetchDataRequest
FetchDataResponse
JoinRequest
JoinResponse
PromoteRaftRequest
PromoteRaftResponse
*/
package internal
@ -64,45 +61,6 @@ var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type RPCType int32
const (
RPCType_Error RPCType = 1
RPCType_FetchData RPCType = 2
RPCType_Join RPCType = 3
RPCType_PromoteRaft RPCType = 4
)
var RPCType_name = map[int32]string{
1: "Error",
2: "FetchData",
3: "Join",
4: "PromoteRaft",
}
var RPCType_value = map[string]int32{
"Error": 1,
"FetchData": 2,
"Join": 3,
"PromoteRaft": 4,
}
func (x RPCType) Enum() *RPCType {
p := new(RPCType)
*p = x
return p
}
func (x RPCType) String() string {
return proto.EnumName(RPCType_name, int32(x))
}
func (x *RPCType) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(RPCType_value, data, "RPCType")
if err != nil {
return err
}
*x = RPCType(value)
return nil
}
type Command_Type int32
const (
@ -128,6 +86,11 @@ const (
Command_CreateSubscriptionCommand Command_Type = 21
Command_DropSubscriptionCommand Command_Type = 22
Command_RemovePeerCommand Command_Type = 23
Command_CreateMetaNodeCommand Command_Type = 24
Command_CreateDataNodeCommand Command_Type = 25
Command_UpdateDataNodeCommand Command_Type = 26
Command_DeleteMetaNodeCommand Command_Type = 27
Command_DeleteDataNodeCommand Command_Type = 28
)
var Command_Type_name = map[int32]string{
@ -153,6 +116,11 @@ var Command_Type_name = map[int32]string{
21: "CreateSubscriptionCommand",
22: "DropSubscriptionCommand",
23: "RemovePeerCommand",
24: "CreateMetaNodeCommand",
25: "CreateDataNodeCommand",
26: "UpdateDataNodeCommand",
27: "DeleteMetaNodeCommand",
28: "DeleteDataNodeCommand",
}
var Command_Type_value = map[string]int32{
"CreateNodeCommand": 1,
@ -177,6 +145,11 @@ var Command_Type_value = map[string]int32{
"CreateSubscriptionCommand": 21,
"DropSubscriptionCommand": 22,
"RemovePeerCommand": 23,
"CreateMetaNodeCommand": 24,
"CreateDataNodeCommand": 25,
"UpdateDataNodeCommand": 26,
"DeleteMetaNodeCommand": 27,
"DeleteDataNodeCommand": 28,
}
func (x Command_Type) Enum() *Command_Type {
@ -197,16 +170,19 @@ func (x *Command_Type) UnmarshalJSON(data []byte) error {
}
type Data struct {
Term *uint64 `protobuf:"varint,1,req,name=Term" json:"Term,omitempty"`
Index *uint64 `protobuf:"varint,2,req,name=Index" json:"Index,omitempty"`
ClusterID *uint64 `protobuf:"varint,3,req,name=ClusterID" json:"ClusterID,omitempty"`
Nodes []*NodeInfo `protobuf:"bytes,4,rep,name=Nodes" json:"Nodes,omitempty"`
Databases []*DatabaseInfo `protobuf:"bytes,5,rep,name=Databases" json:"Databases,omitempty"`
Users []*UserInfo `protobuf:"bytes,6,rep,name=Users" json:"Users,omitempty"`
MaxNodeID *uint64 `protobuf:"varint,7,req,name=MaxNodeID" json:"MaxNodeID,omitempty"`
MaxShardGroupID *uint64 `protobuf:"varint,8,req,name=MaxShardGroupID" json:"MaxShardGroupID,omitempty"`
MaxShardID *uint64 `protobuf:"varint,9,req,name=MaxShardID" json:"MaxShardID,omitempty"`
XXX_unrecognized []byte `json:"-"`
Term *uint64 `protobuf:"varint,1,req,name=Term" json:"Term,omitempty"`
Index *uint64 `protobuf:"varint,2,req,name=Index" json:"Index,omitempty"`
ClusterID *uint64 `protobuf:"varint,3,req,name=ClusterID" json:"ClusterID,omitempty"`
Nodes []*NodeInfo `protobuf:"bytes,4,rep,name=Nodes" json:"Nodes,omitempty"`
Databases []*DatabaseInfo `protobuf:"bytes,5,rep,name=Databases" json:"Databases,omitempty"`
Users []*UserInfo `protobuf:"bytes,6,rep,name=Users" json:"Users,omitempty"`
MaxNodeID *uint64 `protobuf:"varint,7,req,name=MaxNodeID" json:"MaxNodeID,omitempty"`
MaxShardGroupID *uint64 `protobuf:"varint,8,req,name=MaxShardGroupID" json:"MaxShardGroupID,omitempty"`
MaxShardID *uint64 `protobuf:"varint,9,req,name=MaxShardID" json:"MaxShardID,omitempty"`
// added for 0.10.0
DataNodes []*NodeInfo `protobuf:"bytes,10,rep,name=DataNodes" json:"DataNodes,omitempty"`
MetaNodes []*NodeInfo `protobuf:"bytes,11,rep,name=MetaNodes" json:"MetaNodes,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Data) Reset() { *m = Data{} }
@ -276,9 +252,24 @@ func (m *Data) GetMaxShardID() uint64 {
return 0
}
func (m *Data) GetDataNodes() []*NodeInfo {
if m != nil {
return m.DataNodes
}
return nil
}
func (m *Data) GetMetaNodes() []*NodeInfo {
if m != nil {
return m.MetaNodes
}
return nil
}
type NodeInfo struct {
ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"`
Host *string `protobuf:"bytes,2,req,name=Host" json:"Host,omitempty"`
TCPHost *string `protobuf:"bytes,3,opt,name=TCPHost" json:"TCPHost,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -300,6 +291,13 @@ func (m *NodeInfo) GetHost() string {
return ""
}
func (m *NodeInfo) GetTCPHost() string {
if m != nil && m.TCPHost != nil {
return *m.TCPHost
}
return ""
}
type DatabaseInfo struct {
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
DefaultRetentionPolicy *string `protobuf:"bytes,2,req,name=DefaultRetentionPolicy" json:"DefaultRetentionPolicy,omitempty"`
@ -643,6 +641,8 @@ func (m *Command) GetType() Command_Type {
return Command_CreateNodeCommand
}
// This isn't used in >= 0.10.0. Kept around for upgrade purposes. Instead
// look at CreateDataNodeCommand and CreateMetaNodeCommand
type CreateNodeCommand struct {
Host *string `protobuf:"bytes,1,req,name=Host" json:"Host,omitempty"`
Rand *uint64 `protobuf:"varint,2,req,name=Rand" json:"Rand,omitempty"`
@ -1380,7 +1380,7 @@ var E_DropSubscriptionCommand_Command = &proto.ExtensionDesc{
}
type RemovePeerCommand struct {
ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"`
ID *uint64 `protobuf:"varint,1,opt,name=ID" json:"ID,omitempty"`
Addr *string `protobuf:"bytes,2,req,name=Addr" json:"Addr,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1411,6 +1411,158 @@ var E_RemovePeerCommand_Command = &proto.ExtensionDesc{
Tag: "bytes,123,opt,name=command",
}
type CreateMetaNodeCommand struct {
HTTPAddr *string `protobuf:"bytes,1,req,name=HTTPAddr" json:"HTTPAddr,omitempty"`
TCPAddr *string `protobuf:"bytes,2,req,name=TCPAddr" json:"TCPAddr,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *CreateMetaNodeCommand) Reset() { *m = CreateMetaNodeCommand{} }
func (m *CreateMetaNodeCommand) String() string { return proto.CompactTextString(m) }
func (*CreateMetaNodeCommand) ProtoMessage() {}
func (m *CreateMetaNodeCommand) GetHTTPAddr() string {
if m != nil && m.HTTPAddr != nil {
return *m.HTTPAddr
}
return ""
}
func (m *CreateMetaNodeCommand) GetTCPAddr() string {
if m != nil && m.TCPAddr != nil {
return *m.TCPAddr
}
return ""
}
var E_CreateMetaNodeCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*CreateMetaNodeCommand)(nil),
Field: 124,
Name: "internal.CreateMetaNodeCommand.command",
Tag: "bytes,124,opt,name=command",
}
type CreateDataNodeCommand struct {
HTTPAddr *string `protobuf:"bytes,1,req,name=HTTPAddr" json:"HTTPAddr,omitempty"`
TCPAddr *string `protobuf:"bytes,2,req,name=TCPAddr" json:"TCPAddr,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *CreateDataNodeCommand) Reset() { *m = CreateDataNodeCommand{} }
func (m *CreateDataNodeCommand) String() string { return proto.CompactTextString(m) }
func (*CreateDataNodeCommand) ProtoMessage() {}
func (m *CreateDataNodeCommand) GetHTTPAddr() string {
if m != nil && m.HTTPAddr != nil {
return *m.HTTPAddr
}
return ""
}
func (m *CreateDataNodeCommand) GetTCPAddr() string {
if m != nil && m.TCPAddr != nil {
return *m.TCPAddr
}
return ""
}
var E_CreateDataNodeCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*CreateDataNodeCommand)(nil),
Field: 125,
Name: "internal.CreateDataNodeCommand.command",
Tag: "bytes,125,opt,name=command",
}
type UpdateDataNodeCommand struct {
ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"`
Host *string `protobuf:"bytes,2,req,name=Host" json:"Host,omitempty"`
TCPHost *string `protobuf:"bytes,3,req,name=TCPHost" json:"TCPHost,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *UpdateDataNodeCommand) Reset() { *m = UpdateDataNodeCommand{} }
func (m *UpdateDataNodeCommand) String() string { return proto.CompactTextString(m) }
func (*UpdateDataNodeCommand) ProtoMessage() {}
func (m *UpdateDataNodeCommand) GetID() uint64 {
if m != nil && m.ID != nil {
return *m.ID
}
return 0
}
func (m *UpdateDataNodeCommand) GetHost() string {
if m != nil && m.Host != nil {
return *m.Host
}
return ""
}
func (m *UpdateDataNodeCommand) GetTCPHost() string {
if m != nil && m.TCPHost != nil {
return *m.TCPHost
}
return ""
}
var E_UpdateDataNodeCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*UpdateDataNodeCommand)(nil),
Field: 126,
Name: "internal.UpdateDataNodeCommand.command",
Tag: "bytes,126,opt,name=command",
}
type DeleteMetaNodeCommand struct {
ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *DeleteMetaNodeCommand) Reset() { *m = DeleteMetaNodeCommand{} }
func (m *DeleteMetaNodeCommand) String() string { return proto.CompactTextString(m) }
func (*DeleteMetaNodeCommand) ProtoMessage() {}
func (m *DeleteMetaNodeCommand) GetID() uint64 {
if m != nil && m.ID != nil {
return *m.ID
}
return 0
}
var E_DeleteMetaNodeCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*DeleteMetaNodeCommand)(nil),
Field: 127,
Name: "internal.DeleteMetaNodeCommand.command",
Tag: "bytes,127,opt,name=command",
}
type DeleteDataNodeCommand struct {
ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *DeleteDataNodeCommand) Reset() { *m = DeleteDataNodeCommand{} }
func (m *DeleteDataNodeCommand) String() string { return proto.CompactTextString(m) }
func (*DeleteDataNodeCommand) ProtoMessage() {}
func (m *DeleteDataNodeCommand) GetID() uint64 {
if m != nil && m.ID != nil {
return *m.ID
}
return 0
}
var E_DeleteDataNodeCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*DeleteDataNodeCommand)(nil),
Field: 128,
Name: "internal.DeleteDataNodeCommand.command",
Tag: "bytes,128,opt,name=command",
}
type Response struct {
OK *bool `protobuf:"varint,1,req,name=OK" json:"OK,omitempty"`
Error *string `protobuf:"bytes,2,opt,name=Error" json:"Error,omitempty"`
@ -1443,226 +1595,47 @@ func (m *Response) GetIndex() uint64 {
return 0
}
type ResponseHeader struct {
OK *bool `protobuf:"varint,1,req,name=OK" json:"OK,omitempty"`
Error *string `protobuf:"bytes,2,opt,name=Error" json:"Error,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ResponseHeader) Reset() { *m = ResponseHeader{} }
func (m *ResponseHeader) String() string { return proto.CompactTextString(m) }
func (*ResponseHeader) ProtoMessage() {}
func (m *ResponseHeader) GetOK() bool {
if m != nil && m.OK != nil {
return *m.OK
}
return false
}
func (m *ResponseHeader) GetError() string {
if m != nil && m.Error != nil {
return *m.Error
}
return ""
}
type ErrorResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,req,name=Header" json:"Header,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ErrorResponse) Reset() { *m = ErrorResponse{} }
func (m *ErrorResponse) String() string { return proto.CompactTextString(m) }
func (*ErrorResponse) ProtoMessage() {}
func (m *ErrorResponse) GetHeader() *ResponseHeader {
if m != nil {
return m.Header
}
return nil
}
type FetchDataRequest struct {
Index *uint64 `protobuf:"varint,1,req,name=Index" json:"Index,omitempty"`
Term *uint64 `protobuf:"varint,2,req,name=Term" json:"Term,omitempty"`
Blocking *bool `protobuf:"varint,3,opt,name=Blocking,def=0" json:"Blocking,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *FetchDataRequest) Reset() { *m = FetchDataRequest{} }
func (m *FetchDataRequest) String() string { return proto.CompactTextString(m) }
func (*FetchDataRequest) ProtoMessage() {}
const Default_FetchDataRequest_Blocking bool = false
func (m *FetchDataRequest) GetIndex() uint64 {
if m != nil && m.Index != nil {
return *m.Index
}
return 0
}
func (m *FetchDataRequest) GetTerm() uint64 {
if m != nil && m.Term != nil {
return *m.Term
}
return 0
}
func (m *FetchDataRequest) GetBlocking() bool {
if m != nil && m.Blocking != nil {
return *m.Blocking
}
return Default_FetchDataRequest_Blocking
}
type FetchDataResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,req,name=Header" json:"Header,omitempty"`
Index *uint64 `protobuf:"varint,2,req,name=Index" json:"Index,omitempty"`
Term *uint64 `protobuf:"varint,3,req,name=Term" json:"Term,omitempty"`
Data []byte `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *FetchDataResponse) Reset() { *m = FetchDataResponse{} }
func (m *FetchDataResponse) String() string { return proto.CompactTextString(m) }
func (*FetchDataResponse) ProtoMessage() {}
func (m *FetchDataResponse) GetHeader() *ResponseHeader {
if m != nil {
return m.Header
}
return nil
}
func (m *FetchDataResponse) GetIndex() uint64 {
if m != nil && m.Index != nil {
return *m.Index
}
return 0
}
func (m *FetchDataResponse) GetTerm() uint64 {
if m != nil && m.Term != nil {
return *m.Term
}
return 0
}
func (m *FetchDataResponse) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
type JoinRequest struct {
Addr *string `protobuf:"bytes,1,req,name=Addr" json:"Addr,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *JoinRequest) Reset() { *m = JoinRequest{} }
func (m *JoinRequest) String() string { return proto.CompactTextString(m) }
func (*JoinRequest) ProtoMessage() {}
func (m *JoinRequest) GetAddr() string {
if m != nil && m.Addr != nil {
return *m.Addr
}
return ""
}
type JoinResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,req,name=Header" json:"Header,omitempty"`
EnableRaft *bool `protobuf:"varint,2,opt,name=EnableRaft" json:"EnableRaft,omitempty"`
RaftNodes []string `protobuf:"bytes,3,rep,name=RaftNodes" json:"RaftNodes,omitempty"`
NodeID *uint64 `protobuf:"varint,4,opt,name=NodeID" json:"NodeID,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *JoinResponse) Reset() { *m = JoinResponse{} }
func (m *JoinResponse) String() string { return proto.CompactTextString(m) }
func (*JoinResponse) ProtoMessage() {}
func (m *JoinResponse) GetHeader() *ResponseHeader {
if m != nil {
return m.Header
}
return nil
}
func (m *JoinResponse) GetEnableRaft() bool {
if m != nil && m.EnableRaft != nil {
return *m.EnableRaft
}
return false
}
func (m *JoinResponse) GetRaftNodes() []string {
if m != nil {
return m.RaftNodes
}
return nil
}
func (m *JoinResponse) GetNodeID() uint64 {
if m != nil && m.NodeID != nil {
return *m.NodeID
}
return 0
}
type PromoteRaftRequest struct {
Addr *string `protobuf:"bytes,1,req,name=Addr" json:"Addr,omitempty"`
RaftNodes []string `protobuf:"bytes,2,rep,name=RaftNodes" json:"RaftNodes,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *PromoteRaftRequest) Reset() { *m = PromoteRaftRequest{} }
func (m *PromoteRaftRequest) String() string { return proto.CompactTextString(m) }
func (*PromoteRaftRequest) ProtoMessage() {}
func (m *PromoteRaftRequest) GetAddr() string {
if m != nil && m.Addr != nil {
return *m.Addr
}
return ""
}
func (m *PromoteRaftRequest) GetRaftNodes() []string {
if m != nil {
return m.RaftNodes
}
return nil
}
type PromoteRaftResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,req,name=Header" json:"Header,omitempty"`
Success *bool `protobuf:"varint,2,opt,name=Success" json:"Success,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *PromoteRaftResponse) Reset() { *m = PromoteRaftResponse{} }
func (m *PromoteRaftResponse) String() string { return proto.CompactTextString(m) }
func (*PromoteRaftResponse) ProtoMessage() {}
func (m *PromoteRaftResponse) GetHeader() *ResponseHeader {
if m != nil {
return m.Header
}
return nil
}
func (m *PromoteRaftResponse) GetSuccess() bool {
if m != nil && m.Success != nil {
return *m.Success
}
return false
}
func init() {
proto.RegisterEnum("internal.RPCType", RPCType_name, RPCType_value)
proto.RegisterType((*Data)(nil), "internal.Data")
proto.RegisterType((*NodeInfo)(nil), "internal.NodeInfo")
proto.RegisterType((*DatabaseInfo)(nil), "internal.DatabaseInfo")
proto.RegisterType((*RetentionPolicyInfo)(nil), "internal.RetentionPolicyInfo")
proto.RegisterType((*ShardGroupInfo)(nil), "internal.ShardGroupInfo")
proto.RegisterType((*ShardInfo)(nil), "internal.ShardInfo")
proto.RegisterType((*SubscriptionInfo)(nil), "internal.SubscriptionInfo")
proto.RegisterType((*ShardOwner)(nil), "internal.ShardOwner")
proto.RegisterType((*ContinuousQueryInfo)(nil), "internal.ContinuousQueryInfo")
proto.RegisterType((*UserInfo)(nil), "internal.UserInfo")
proto.RegisterType((*UserPrivilege)(nil), "internal.UserPrivilege")
proto.RegisterType((*Command)(nil), "internal.Command")
proto.RegisterType((*CreateNodeCommand)(nil), "internal.CreateNodeCommand")
proto.RegisterType((*DeleteNodeCommand)(nil), "internal.DeleteNodeCommand")
proto.RegisterType((*CreateDatabaseCommand)(nil), "internal.CreateDatabaseCommand")
proto.RegisterType((*DropDatabaseCommand)(nil), "internal.DropDatabaseCommand")
proto.RegisterType((*CreateRetentionPolicyCommand)(nil), "internal.CreateRetentionPolicyCommand")
proto.RegisterType((*DropRetentionPolicyCommand)(nil), "internal.DropRetentionPolicyCommand")
proto.RegisterType((*SetDefaultRetentionPolicyCommand)(nil), "internal.SetDefaultRetentionPolicyCommand")
proto.RegisterType((*UpdateRetentionPolicyCommand)(nil), "internal.UpdateRetentionPolicyCommand")
proto.RegisterType((*CreateShardGroupCommand)(nil), "internal.CreateShardGroupCommand")
proto.RegisterType((*DeleteShardGroupCommand)(nil), "internal.DeleteShardGroupCommand")
proto.RegisterType((*CreateContinuousQueryCommand)(nil), "internal.CreateContinuousQueryCommand")
proto.RegisterType((*DropContinuousQueryCommand)(nil), "internal.DropContinuousQueryCommand")
proto.RegisterType((*CreateUserCommand)(nil), "internal.CreateUserCommand")
proto.RegisterType((*DropUserCommand)(nil), "internal.DropUserCommand")
proto.RegisterType((*UpdateUserCommand)(nil), "internal.UpdateUserCommand")
proto.RegisterType((*SetPrivilegeCommand)(nil), "internal.SetPrivilegeCommand")
proto.RegisterType((*SetDataCommand)(nil), "internal.SetDataCommand")
proto.RegisterType((*SetAdminPrivilegeCommand)(nil), "internal.SetAdminPrivilegeCommand")
proto.RegisterType((*UpdateNodeCommand)(nil), "internal.UpdateNodeCommand")
proto.RegisterType((*CreateSubscriptionCommand)(nil), "internal.CreateSubscriptionCommand")
proto.RegisterType((*DropSubscriptionCommand)(nil), "internal.DropSubscriptionCommand")
proto.RegisterType((*RemovePeerCommand)(nil), "internal.RemovePeerCommand")
proto.RegisterType((*CreateMetaNodeCommand)(nil), "internal.CreateMetaNodeCommand")
proto.RegisterType((*CreateDataNodeCommand)(nil), "internal.CreateDataNodeCommand")
proto.RegisterType((*UpdateDataNodeCommand)(nil), "internal.UpdateDataNodeCommand")
proto.RegisterType((*DeleteMetaNodeCommand)(nil), "internal.DeleteMetaNodeCommand")
proto.RegisterType((*DeleteDataNodeCommand)(nil), "internal.DeleteDataNodeCommand")
proto.RegisterType((*Response)(nil), "internal.Response")
proto.RegisterEnum("internal.Command_Type", Command_Type_name, Command_Type_value)
proto.RegisterExtension(E_CreateNodeCommand_Command)
proto.RegisterExtension(E_DeleteNodeCommand_Command)
@ -1686,4 +1659,9 @@ func init() {
proto.RegisterExtension(E_CreateSubscriptionCommand_Command)
proto.RegisterExtension(E_DropSubscriptionCommand_Command)
proto.RegisterExtension(E_RemovePeerCommand_Command)
proto.RegisterExtension(E_CreateMetaNodeCommand_Command)
proto.RegisterExtension(E_CreateDataNodeCommand_Command)
proto.RegisterExtension(E_UpdateDataNodeCommand_Command)
proto.RegisterExtension(E_DeleteMetaNodeCommand_Command)
proto.RegisterExtension(E_DeleteDataNodeCommand_Command)
}

View File

@ -18,11 +18,16 @@ message Data {
required uint64 MaxNodeID = 7;
required uint64 MaxShardGroupID = 8;
required uint64 MaxShardID = 9;
// added for 0.10.0
repeated NodeInfo DataNodes = 10;
repeated NodeInfo MetaNodes = 11;
}
message NodeInfo {
required uint64 ID = 1;
required string Host = 2;
optional string TCPHost = 3;
}
message DatabaseInfo {
@ -115,11 +120,18 @@ message Command {
CreateSubscriptionCommand = 21;
DropSubscriptionCommand = 22;
RemovePeerCommand = 23;
CreateMetaNodeCommand = 24;
CreateDataNodeCommand = 25;
UpdateDataNodeCommand = 26;
DeleteMetaNodeCommand = 27;
DeleteDataNodeCommand = 28;
}
required Type type = 1;
}
// This isn't used in >= 0.10.0. Kept around for upgrade purposes. Instead
// look at CreateDataNodeCommand and CreateMetaNodeCommand
message CreateNodeCommand {
extend Command {
optional CreateNodeCommand command = 101;
@ -301,77 +313,51 @@ message RemovePeerCommand {
extend Command {
optional RemovePeerCommand command = 123;
}
required uint64 ID = 1;
optional uint64 ID = 1;
required string Addr = 2;
}
message CreateMetaNodeCommand {
extend Command {
optional CreateMetaNodeCommand command = 124;
}
required string HTTPAddr = 1;
required string TCPAddr = 2;
}
message CreateDataNodeCommand {
extend Command {
optional CreateDataNodeCommand command = 125;
}
required string HTTPAddr = 1;
required string TCPAddr = 2;
}
message UpdateDataNodeCommand {
extend Command {
optional UpdateDataNodeCommand command = 126;
}
required uint64 ID = 1;
required string Host = 2;
required string TCPHost = 3;
}
message DeleteMetaNodeCommand {
extend Command {
optional DeleteMetaNodeCommand command = 127;
}
required uint64 ID = 1;
}
message DeleteDataNodeCommand {
extend Command {
optional DeleteDataNodeCommand command = 128;
}
required uint64 ID = 1;
}
message Response {
required bool OK = 1;
optional string Error = 2;
optional uint64 Index = 3;
}
//========================================================================
//
// RPC - higher-level cluster communication operations
//
//========================================================================
enum RPCType {
Error = 1;
FetchData = 2;
Join = 3;
PromoteRaft = 4;
}
message ResponseHeader {
required bool OK = 1;
optional string Error = 2;
}
message ErrorResponse {
required ResponseHeader Header = 1;
}
message FetchDataRequest {
required uint64 Index = 1;
required uint64 Term = 2;
optional bool Blocking = 3 [default = false];
}
message FetchDataResponse {
required ResponseHeader Header = 1;
required uint64 Index = 2;
required uint64 Term = 3;
optional bytes Data = 4;
}
message JoinRequest {
required string Addr = 1;
}
message JoinResponse {
required ResponseHeader Header = 1;
// Indicates that this node should take part in the raft cluster.
optional bool EnableRaft = 2;
// The addresses of raft peers to use if joining as a raft member. If not joining
// as a raft member, these are the nodes running raft.
repeated string RaftNodes = 3;
// The node ID assigned to the requesting node.
optional uint64 NodeID = 4;
}
message PromoteRaftRequest {
required string Addr = 1;
repeated string RaftNodes = 2;
}
message PromoteRaftResponse {
required ResponseHeader Header = 1;
optional bool Success = 2;
}

View File

@ -26,29 +26,27 @@ const (
// raftState is a consensus strategy that uses a local raft implementation for
// consensus operations.
type raftState struct {
wg sync.WaitGroup
config *Config
closing chan struct{}
raft *raft.Raft
transport *raft.NetworkTransport
peerStore raft.PeerStore
raftStore *raftboltdb.BoltStore
raftLayer *raftLayer
joinPeers []string
ln net.Listener
logger *log.Logger
remoteAddr net.Addr
path string
wg sync.WaitGroup
config *Config
closing chan struct{}
raft *raft.Raft
transport *raft.NetworkTransport
peerStore raft.PeerStore
raftStore *raftboltdb.BoltStore
raftLayer *raftLayer
ln net.Listener
logger *log.Logger
path string
}
func newRaftState(c *Config, joinPeers []string) *raftState {
func newRaftState(c *Config) *raftState {
return &raftState{
config: c,
joinPeers: joinPeers,
config: c,
}
}
func (r *raftState) open(s *store) error {
func (r *raftState) open(s *store, ln net.Listener, initializePeers []string) error {
r.ln = ln
r.closing = make(chan struct{})
// Setup raft configuration.
@ -66,15 +64,8 @@ func (r *raftState) open(s *store) error {
// If in the future we decide to call remove peer we have to re-evaluate how to handle this
config.ShutdownOnRemove = false
// If no peers are set in the config or there is one and we are it, then start as a single server.
if len(r.joinPeers) <= 1 {
config.EnableSingleNode = true
// Ensure we can always become the leader
config.DisableBootstrapAfterElect = false
}
// Build raft layer to multiplex listener.
r.raftLayer = newRaftLayer(r.ln, r.remoteAddr)
r.raftLayer = newRaftLayer(r.ln)
// Create a transport layer
r.transport = raft.NewNetworkTransport(r.raftLayer, 3, 10*time.Second, config.LogOutput)
@ -82,27 +73,41 @@ func (r *raftState) open(s *store) error {
// Create peer storage.
r.peerStore = raft.NewJSONPeers(r.path, r.transport)
// This server is joining the raft cluster for the first time if initializePeers are passed in
if len(initializePeers) > 0 {
if err := r.peerStore.SetPeers(initializePeers); err != nil {
return err
}
}
peers, err := r.peerStore.Peers()
if err != nil {
return err
}
// For single-node clusters, we can update the raft peers before we start the cluster if the hostname
// has changed.
if config.EnableSingleNode {
if err := r.peerStore.SetPeers([]string{r.remoteAddr.String()}); err != nil {
// If no peers are set in the config or there is one and we are it, then start as a single server.
if len(peers) <= 1 {
fmt.Println("single node!")
config.EnableSingleNode = true
// Ensure we can always become the leader
config.DisableBootstrapAfterElect = false
// For single-node clusters, we can update the raft peers before we start the cluster if the hostname
// has changed.
if err := r.peerStore.SetPeers([]string{r.ln.Addr().String()}); err != nil {
return err
}
peers = []string{r.remoteAddr.String()}
peers = []string{r.ln.Addr().String()}
}
// If we have multiple nodes in the cluster, make sure our address is in the raft peers or
// we won't be able to boot into the cluster because the other peers will reject our new hostname. This
// is difficult to resolve automatically because we need to have all the raft peers agree on the current members
// of the cluster before we can change them.
if len(peers) > 0 && !raft.PeerContained(peers, r.remoteAddr.String()) {
r.logger.Printf("%s is not in the list of raft peers. Please update %v/peers.json on all raft nodes to have the same contents.", r.remoteAddr.String(), r.path)
return fmt.Errorf("peers out of sync: %v not in %v", r.remoteAddr.String(), peers)
if len(peers) > 0 && !raft.PeerContained(peers, r.ln.Addr().String()) {
r.logger.Printf("%s is not in the list of raft peers. Please update %v/peers.json on all raft nodes to have the same contents.", r.ln.Addr().String(), r.path)
return fmt.Errorf("peers out of sync: %v not in %v", r.ln.Addr().String(), peers)
}
// Create the log store and stable store.
@ -176,22 +181,6 @@ func (r *raftState) close() error {
return nil
}
func (r *raftState) initialize() error {
// If we have committed entries then the store is already in the cluster.
if index, err := r.raftStore.LastIndex(); err != nil {
return fmt.Errorf("last index: %s", err)
} else if index > 0 {
return nil
}
// Force set peers.
if err := r.setPeers(r.joinPeers); err != nil {
return fmt.Errorf("set raft peers: %s", err)
}
return nil
}
// apply applies a serialized command to the raft log.
func (r *raftState) apply(b []byte) error {
// Apply to raft log.
@ -224,13 +213,24 @@ func (r *raftState) snapshot() error {
// addPeer adds addr to the list of peers in the cluster.
func (r *raftState) addPeer(addr string) error {
// peers, err := r.peerStore.Peers()
// if err != nil {
// return err
// }
// peers = append(peers, addr)
// if fut := r.raft.SetPeers(peers); fut.Error() != nil {
// return fut.Error()
// }
peers, err := r.peerStore.Peers()
if err != nil {
return err
}
if len(peers) >= 3 {
return nil
for _, p := range peers {
if addr == p {
return nil
}
}
if fut := r.raft.AddPeer(addr); fut.Error() != nil {
@ -251,11 +251,6 @@ func (r *raftState) removePeer(addr string) error {
return nil
}
// setPeers sets a list of peers in the cluster.
func (r *raftState) setPeers(addrs []string) error {
return r.raft.SetPeers(addrs).Error()
}
func (r *raftState) peers() ([]string, error) {
return r.peerStore.Peers()
}
@ -278,23 +273,21 @@ func (r *raftState) isLeader() bool {
// raftLayer wraps the connection so it can be re-used for forwarding.
type raftLayer struct {
ln net.Listener
addr net.Addr
conn chan net.Conn
closed chan struct{}
}
// newRaftLayer returns a new instance of raftLayer.
func newRaftLayer(ln net.Listener, addr net.Addr) *raftLayer {
func newRaftLayer(ln net.Listener) *raftLayer {
return &raftLayer{
ln: ln,
addr: addr,
conn: make(chan net.Conn),
closed: make(chan struct{}),
}
}
// Addr returns the local address for the layer.
func (l *raftLayer) Addr() net.Addr { return l.addr }
func (l *raftLayer) Addr() net.Addr { return l.ln.Addr() }
// Dial creates a new network connection.
func (l *raftLayer) Dial(addr string, timeout time.Duration) (net.Conn, error) {
@ -302,6 +295,12 @@ func (l *raftLayer) Dial(addr string, timeout time.Duration) (net.Conn, error) {
if err != nil {
return nil, err
}
// Write a marker byte for raft messages.
_, err = conn.Write([]byte{MuxHeader})
if err != nil {
conn.Close()
return nil, err
}
return conn, err
}

View File

@ -8,13 +8,21 @@ import (
"net/http"
"os"
"strings"
"github.com/influxdb/influxdb"
)
const (
MuxHeader = 8
)
type Service struct {
RaftListener net.Listener
config *Config
node *influxdb.Node
handler *handler
ln net.Listener
raftAddr string
httpAddr string
https bool
cert string
@ -24,11 +32,10 @@ type Service struct {
}
// NewService returns a new instance of Service.
func NewService(c *Config) *Service {
func NewService(c *Config, node *influxdb.Node) *Service {
s := &Service{
config: c,
raftAddr: c.RaftBindAddress,
httpAddr: c.HTTPdBindAddress,
httpAddr: c.HTTPBindAddress,
https: c.HTTPSEnabled,
cert: c.HTTPSCertificate,
err: make(chan error),
@ -41,20 +48,10 @@ func NewService(c *Config) *Service {
func (s *Service) Open() error {
s.Logger.Println("Starting meta service")
// Open the store
store := newStore(s.config)
// Set the peers from the config
store.peers = s.config.Peers
s.store = store
if err := s.store.open(); err != nil {
return err
if s.RaftListener == nil {
panic("no raft listener set")
}
handler := newHandler(s.config)
handler.logger = s.Logger
handler.store = store
s.handler = handler
// Open listener.
if s.https {
cert, err := tls.LoadX509KeyPair(s.cert, s.cert)
@ -82,6 +79,17 @@ func (s *Service) Open() error {
}
s.httpAddr = s.ln.Addr().String()
// Open the store
s.store = newStore(s.config)
if err := s.store.open(s.ln.Addr().String(), s.RaftListener); err != nil {
return err
}
handler := newHandler(s.config)
handler.logger = s.Logger
handler.store = s.store
s.handler = handler
// Begin listening for requests in a separate goroutine.
go s.serve()
return nil

View File

@ -1,45 +1,33 @@
package meta
package meta_test
import (
"bytes"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"path"
"runtime"
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/influxdb/influxdb/services/meta/internal"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/services/meta"
"github.com/influxdb/influxdb/tcp"
)
func TestService_Open(t *testing.T) {
t.Parallel()
cfg := newConfig()
defer os.RemoveAll(cfg.Dir)
s := NewService(cfg)
if err := s.Open(); err != nil {
t.Fatal(err)
}
if err := s.Close(); err != nil {
t.Fatal(err)
}
}
// Test the ping endpoint.
func TestService_PingEndpoint(t *testing.T) {
func TestMetaService_PingEndpoint(t *testing.T) {
t.Parallel()
cfg := newConfig()
defer os.RemoveAll(cfg.Dir)
s := NewService(cfg)
s := newService(cfg)
if err := s.Open(); err != nil {
t.Fatal(err)
}
defer s.Close()
url, err := url.Parse(s.URL())
if err != nil {
@ -64,167 +52,115 @@ func TestService_PingEndpoint(t *testing.T) {
}
}
// Test creating a node in the meta service.
func TestService_CreateNode(t *testing.T) {
t.Parallel()
func TestMetaService_CreateDatabase(t *testing.T) {
cfg := newConfig()
defer os.RemoveAll(cfg.Dir)
s := NewService(cfg)
s := newService(cfg)
if err := s.Open(); err != nil {
t.Fatal(err)
}
defer s.Close()
before, err := snapshot(s, 0)
c := meta.NewClient([]string{s.URL()}, false)
if err := c.Open(); err != nil {
t.Fatalf(err.Error())
}
defer c.Close()
c.ExecuteStatement(mustParseStatement("CREATE DATABASE foo"))
db, err := c.Database("foo")
if err != nil {
t.Fatal(err)
t.Fatalf(err.Error())
}
node := before.Node(1)
if node != nil {
t.Fatal("expected <nil> but got a node")
}
host := "127.0.0.1"
cmdval := &internal.CreateNodeCommand{
Host: proto.String(host),
Rand: proto.Uint64(42),
}
if err := exec(s, internal.Command_CreateNodeCommand, internal.E_CreateNodeCommand_Command, cmdval); err != nil {
t.Fatal(err)
}
after, err := snapshot(s, 0)
if err != nil {
t.Fatal(err)
}
node = after.Node(1)
if node == nil {
t.Fatal("expected node but got <nil>")
} else if node.Host != host {
t.Fatalf("unexpected host:\n\texp: %s\n\tgot: %s\n", host, node.Host)
if db.Name != "foo" {
t.Fatalf("db name wrong: %s", db.Name)
}
}
// Test creating a database in the meta service.
func TestService_CreateDatabase(t *testing.T) {
t.Parallel()
func TestMetaService_CreateRemoveMetaNode(t *testing.T) {
cfg1 := newConfig()
defer os.RemoveAll(cfg1.Dir)
cfg2 := newConfig()
defer os.RemoveAll(cfg2.Dir)
cfg3 := newConfig()
defer os.RemoveAll(cfg3.Dir)
cfg4 := newConfig()
defer os.RemoveAll(cfg4.Dir)
cfg := newConfig()
defer os.RemoveAll(cfg.Dir)
s := NewService(cfg)
if err := s.Open(); err != nil {
t.Fatal(err)
s1 := newService(cfg1)
if err := s1.Open(); err != nil {
t.Fatalf(err.Error())
}
defer s1.Close()
before, err := snapshot(s, 0)
if err != nil {
t.Fatal(err)
cfg2.JoinPeers = []string{s1.URL()}
s2 := newService(cfg2)
if err := s2.Open(); err != nil {
t.Fatal(err.Error())
}
defer s2.Close()
name := "mydb"
db := before.Database(name)
if db != nil {
t.Fatal("expected <nil> but got database")
}
cmdval := &internal.CreateDatabaseCommand{
Name: proto.String(name),
}
if err := exec(s, internal.Command_CreateDatabaseCommand, internal.E_CreateDatabaseCommand_Command, cmdval); err != nil {
t.Fatal(err)
}
after, err := snapshot(s, 0)
if err != nil {
t.Fatal(err)
}
db = after.Database(name)
if db == nil {
t.Fatal("expected database but got <nil>")
} else if db.Name != name {
t.Fatalf("unexpected name:\n\texp: %s\n\tgot: %s\n", name, db.Name)
}
}
// Test long poll of snapshot.
// Clients will make a long poll request for a snapshot update by passing their
// current snapshot index. The meta service will respond to the request when
// its snapshot index exceeds the client's snapshot index.
func TestService_LongPoll(t *testing.T) {
t.Parallel()
cfg := newConfig()
defer os.RemoveAll(cfg.Dir)
s := NewService(cfg)
if err := s.Open(); err != nil {
t.Fatal(err)
}
before, err := snapshot(s, 0)
if err != nil {
t.Fatal(err)
}
node := before.Node(1)
if node != nil {
t.Fatal("expected <nil> but got a node")
}
// Start a long poll request for a snapshot update.
ch := make(chan *Data)
errch := make(chan error)
go func() {
after, err := snapshot(s, 1)
if err != nil {
errch <- err
func() {
cfg3.JoinPeers = []string{s2.URL()}
s3 := newService(cfg3)
if err := s3.Open(); err != nil {
t.Fatal(err.Error())
}
ch <- after
}()
defer s3.Close()
// Fire off an update after a delay.
host := "127.0.0.1"
update := make(chan struct{})
go func() {
<-update
cmdval := &internal.CreateNodeCommand{
Host: proto.String(host),
Rand: proto.Uint64(42),
fmt.Println("ALL OPEN!")
c1 := meta.NewClient([]string{s1.URL()}, false)
if err := c1.Open(); err != nil {
t.Fatal(err.Error())
}
if err := exec(s, internal.Command_CreateNodeCommand, internal.E_CreateNodeCommand_Command, cmdval); err != nil {
errch <- err
defer c1.Close()
metaNodes, _ := c1.MetaNodes()
if len(metaNodes) != 3 {
t.Fatalf("meta nodes wrong: %v", metaNodes)
}
}()
for i := 0; i < 2; i++ {
select {
case after := <-ch:
node = after.Node(1)
if node == nil {
t.Fatal("expected node but got <nil>")
} else if node.Host != host {
t.Fatalf("unexpected host:\n\texp: %s\n\tgot: %s\n", host, node.Host)
}
case err := <-errch:
t.Fatal(err)
case <-time.After(time.Second):
// First time through the loop it should time out because update hasn't happened.
if i == 0 {
// Signal the update
update <- struct{}{}
} else {
t.Fatal("timed out waiting for snapshot update")
}
}
c := meta.NewClient([]string{s1.URL()}, false)
if err := c.Open(); err != nil {
t.Fatal(err.Error())
}
defer c.Close()
if res := c.ExecuteStatement(mustParseStatement("DROP META SERVER 3")); res.Err != nil {
t.Fatal(res.Err)
}
metaNodes, _ := c.MetaNodes()
if len(metaNodes) != 2 {
t.Fatalf("meta nodes wrong: %v", metaNodes)
}
cfg4.JoinPeers = []string{s1.URL()}
s4 := newService(cfg4)
if err := s4.Open(); err != nil {
t.Fatal(err.Error())
}
defer s4.Close()
metaNodes, _ = c.MetaNodes()
if len(metaNodes) != 3 {
t.Fatalf("meta nodes wrong: %v", metaNodes)
}
}
func newConfig() *Config {
cfg := NewConfig()
cfg.RaftBindAddress = "127.0.0.1:0"
cfg.HTTPdBindAddress = "127.0.0.1:0"
// Ensure that if we attempt to create a database and the client
// is pointed at a server that isn't the leader, it automatically
// hits the leader and finishes the command
func TestMetaService_CommandAgainstNonLeader(t *testing.T) {
}
func newConfig() *meta.Config {
cfg := meta.NewConfig()
cfg.BindAddress = "127.0.0.1:0"
cfg.HTTPBindAddress = "127.0.0.1:0"
cfg.Dir = testTempDir(2)
return cfg
}
@ -244,46 +180,28 @@ func testTempDir(skip int) string {
return dir
}
func mustProtoMarshal(v proto.Message) []byte {
b, err := proto.Marshal(v)
func newService(cfg *meta.Config) *meta.Service {
// Open shared TCP connection.
ln, err := net.Listen("tcp", cfg.BindAddress)
if err != nil {
panic(err)
}
return b
// Multiplex listener.
mux := tcp.NewMux()
s := meta.NewService(cfg, &influxdb.Node{})
s.RaftListener = mux.Listen(meta.MuxHeader)
go mux.Serve(ln)
return s
}
func snapshot(s *Service, index int) (*Data, error) {
url := fmt.Sprintf("http://%s?index=%d", s.URL(), index)
resp, err := http.Get(url)
func mustParseStatement(s string) influxql.Statement {
stmt, err := influxql.ParseStatement(s)
if err != nil {
return nil, err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
data := &Data{}
if err := data.UnmarshalBinary(b); err != nil {
return nil, err
}
return data, nil
}
func exec(s *Service, typ internal.Command_Type, desc *proto.ExtensionDesc, value interface{}) error {
// Create command.
cmd := &internal.Command{Type: &typ}
if err := proto.SetExtension(cmd, desc, value); err != nil {
panic(err)
}
b := mustProtoMarshal(cmd)
url := fmt.Sprintf("http://%s/execute", s.URL())
resp, err := http.Post(url, "application/octet-stream", bytes.NewBuffer(b))
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected result:\n\texp: %d\n\tgot: %d\n", http.StatusOK, resp.StatusCode)
}
return nil
return stmt
}

View File

@ -0,0 +1,457 @@
package meta
import (
"bytes"
"fmt"
"strconv"
"time"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/models"
)
// StatementExecutor translates InfluxQL queries to meta store methods.
type StatementExecutor struct {
Store interface {
DataNode(id uint64) (ni *NodeInfo, err error)
DataNodes() ([]NodeInfo, error)
MetaNodes() ([]NodeInfo, error)
DeleteDataNode(nodeID uint64) error
DeleteMetaNode(nodeID uint64) error
Database(name string) (*DatabaseInfo, error)
Databases() ([]DatabaseInfo, error)
CreateDatabase(name string) (*DatabaseInfo, error)
CreateDatabaseWithRetentionPolicy(name string, rpi *RetentionPolicyInfo) (*DatabaseInfo, error)
DropDatabase(name string) error
CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo) (*RetentionPolicyInfo, error)
UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error
SetDefaultRetentionPolicy(database, name string) error
DropRetentionPolicy(database, name string) error
Users() ([]UserInfo, error)
CreateUser(name, password string, admin bool) (*UserInfo, error)
UpdateUser(name, password string) error
DropUser(name string) error
SetPrivilege(username, database string, p influxql.Privilege) error
SetAdminPrivilege(username string, admin bool) error
UserPrivileges(username string) (map[string]influxql.Privilege, error)
UserPrivilege(username, database string) (*influxql.Privilege, error)
CreateContinuousQuery(database, name, query string) error
DropContinuousQuery(database, name string) error
CreateSubscription(database, rp, name, mode string, destinations []string) error
DropSubscription(database, rp, name string) error
}
}
// ExecuteStatement executes stmt against the meta store as user.
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement) *influxql.Result {
switch stmt := stmt.(type) {
case *influxql.CreateDatabaseStatement:
return e.executeCreateDatabaseStatement(stmt)
case *influxql.DropDatabaseStatement:
return e.executeDropDatabaseStatement(stmt)
case *influxql.ShowDatabasesStatement:
return e.executeShowDatabasesStatement(stmt)
case *influxql.ShowGrantsForUserStatement:
return e.executeShowGrantsForUserStatement(stmt)
case *influxql.ShowServersStatement:
return e.executeShowServersStatement(stmt)
case *influxql.CreateUserStatement:
return e.executeCreateUserStatement(stmt)
case *influxql.SetPasswordUserStatement:
return e.executeSetPasswordUserStatement(stmt)
case *influxql.DropUserStatement:
return e.executeDropUserStatement(stmt)
case *influxql.ShowUsersStatement:
return e.executeShowUsersStatement(stmt)
case *influxql.GrantStatement:
return e.executeGrantStatement(stmt)
case *influxql.GrantAdminStatement:
return e.executeGrantAdminStatement(stmt)
case *influxql.RevokeStatement:
return e.executeRevokeStatement(stmt)
case *influxql.RevokeAdminStatement:
return e.executeRevokeAdminStatement(stmt)
case *influxql.CreateRetentionPolicyStatement:
return e.executeCreateRetentionPolicyStatement(stmt)
case *influxql.AlterRetentionPolicyStatement:
return e.executeAlterRetentionPolicyStatement(stmt)
case *influxql.DropRetentionPolicyStatement:
return e.executeDropRetentionPolicyStatement(stmt)
case *influxql.ShowRetentionPoliciesStatement:
return e.executeShowRetentionPoliciesStatement(stmt)
case *influxql.CreateContinuousQueryStatement:
return e.executeCreateContinuousQueryStatement(stmt)
case *influxql.DropContinuousQueryStatement:
return e.executeDropContinuousQueryStatement(stmt)
case *influxql.ShowContinuousQueriesStatement:
return e.executeShowContinuousQueriesStatement(stmt)
case *influxql.ShowShardsStatement:
return e.executeShowShardsStatement(stmt)
case *influxql.ShowShardGroupsStatement:
return e.executeShowShardGroupsStatement(stmt)
case *influxql.ShowStatsStatement:
return e.executeShowStatsStatement(stmt)
case *influxql.DropServerStatement:
return e.executeDropServerStatement(stmt)
case *influxql.CreateSubscriptionStatement:
return e.executeCreateSubscriptionStatement(stmt)
case *influxql.DropSubscriptionStatement:
return e.executeDropSubscriptionStatement(stmt)
case *influxql.ShowSubscriptionsStatement:
return e.executeShowSubscriptionsStatement(stmt)
default:
panic(fmt.Sprintf("unsupported statement type: %T", stmt))
}
}
func (e *StatementExecutor) executeCreateDatabaseStatement(q *influxql.CreateDatabaseStatement) *influxql.Result {
var err error
if q.RetentionPolicyCreate {
rpi := NewRetentionPolicyInfo(q.RetentionPolicyName)
rpi.Duration = q.RetentionPolicyDuration
rpi.ReplicaN = q.RetentionPolicyReplication
_, err = e.Store.CreateDatabaseWithRetentionPolicy(q.Name, rpi)
} else {
_, err = e.Store.CreateDatabase(q.Name)
}
if err == ErrDatabaseExists && q.IfNotExists {
err = nil
}
return &influxql.Result{Err: err}
}
func (e *StatementExecutor) executeDropDatabaseStatement(q *influxql.DropDatabaseStatement) *influxql.Result {
return &influxql.Result{Err: e.Store.DropDatabase(q.Name)}
}
func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement) *influxql.Result {
dis, err := e.Store.Databases()
if err != nil {
return &influxql.Result{Err: err}
}
row := &models.Row{Name: "databases", Columns: []string{"name"}}
for _, di := range dis {
row.Values = append(row.Values, []interface{}{di.Name})
}
return &influxql.Result{Series: []*models.Row{row}}
}
func (e *StatementExecutor) executeShowGrantsForUserStatement(q *influxql.ShowGrantsForUserStatement) *influxql.Result {
priv, err := e.Store.UserPrivileges(q.Name)
if err != nil {
return &influxql.Result{Err: err}
}
row := &models.Row{Columns: []string{"database", "privilege"}}
for d, p := range priv {
row.Values = append(row.Values, []interface{}{d, p.String()})
}
return &influxql.Result{Series: []*models.Row{row}}
}
func (e *StatementExecutor) executeShowServersStatement(q *influxql.ShowServersStatement) *influxql.Result {
nis, err := e.Store.DataNodes()
if err != nil {
return &influxql.Result{Err: err}
}
dataNodes := &models.Row{Columns: []string{"id", "http_addr", "tcp_addr"}}
dataNodes.Name = "data_nodes"
for _, ni := range nis {
dataNodes.Values = append(dataNodes.Values, []interface{}{ni.ID, ni.Host, ni.TCPHost})
}
nis, err = e.Store.MetaNodes()
if err != nil {
return &influxql.Result{Err: err}
}
metaNodes := &models.Row{Columns: []string{"id", "http_addr", "tcp_addr"}}
metaNodes.Name = "meta_nodes"
for _, ni := range nis {
metaNodes.Values = append(metaNodes.Values, []interface{}{ni.ID, ni.Host, ni.TCPHost})
}
return &influxql.Result{Series: []*models.Row{dataNodes, metaNodes}}
}
func (e *StatementExecutor) executeDropServerStatement(q *influxql.DropServerStatement) *influxql.Result {
var err error
if q.Meta {
err = e.Store.DeleteMetaNode(q.NodeID)
} else {
err = e.Store.DeleteDataNode(q.NodeID)
}
return &influxql.Result{Err: err}
}
func (e *StatementExecutor) executeCreateUserStatement(q *influxql.CreateUserStatement) *influxql.Result {
_, err := e.Store.CreateUser(q.Name, q.Password, q.Admin)
return &influxql.Result{Err: err}
}
func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPasswordUserStatement) *influxql.Result {
return &influxql.Result{Err: e.Store.UpdateUser(q.Name, q.Password)}
}
func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStatement) *influxql.Result {
return &influxql.Result{Err: e.Store.DropUser(q.Name)}
}
func (e *StatementExecutor) executeShowUsersStatement(q *influxql.ShowUsersStatement) *influxql.Result {
uis, err := e.Store.Users()
if err != nil {
return &influxql.Result{Err: err}
}
row := &models.Row{Columns: []string{"user", "admin"}}
for _, ui := range uis {
row.Values = append(row.Values, []interface{}{ui.Name, ui.Admin})
}
return &influxql.Result{Series: []*models.Row{row}}
}
func (e *StatementExecutor) executeGrantStatement(stmt *influxql.GrantStatement) *influxql.Result {
return &influxql.Result{Err: e.Store.SetPrivilege(stmt.User, stmt.On, stmt.Privilege)}
}
func (e *StatementExecutor) executeGrantAdminStatement(stmt *influxql.GrantAdminStatement) *influxql.Result {
return &influxql.Result{Err: e.Store.SetAdminPrivilege(stmt.User, true)}
}
func (e *StatementExecutor) executeRevokeStatement(stmt *influxql.RevokeStatement) *influxql.Result {
priv := influxql.NoPrivileges
// Revoking all privileges means there's no need to look at existing user privileges.
if stmt.Privilege != influxql.AllPrivileges {
p, err := e.Store.UserPrivilege(stmt.User, stmt.On)
if err != nil {
return &influxql.Result{Err: err}
}
// Bit clear (AND NOT) the user's privilege with the revoked privilege.
priv = *p &^ stmt.Privilege
}
return &influxql.Result{Err: e.Store.SetPrivilege(stmt.User, stmt.On, priv)}
}
func (e *StatementExecutor) executeRevokeAdminStatement(stmt *influxql.RevokeAdminStatement) *influxql.Result {
return &influxql.Result{Err: e.Store.SetAdminPrivilege(stmt.User, false)}
}
func (e *StatementExecutor) executeCreateRetentionPolicyStatement(stmt *influxql.CreateRetentionPolicyStatement) *influxql.Result {
rpi := NewRetentionPolicyInfo(stmt.Name)
rpi.Duration = stmt.Duration
rpi.ReplicaN = stmt.Replication
// Create new retention policy.
_, err := e.Store.CreateRetentionPolicy(stmt.Database, rpi)
if err != nil {
return &influxql.Result{Err: err}
}
// If requested, set new policy as the default.
if stmt.Default {
err = e.Store.SetDefaultRetentionPolicy(stmt.Database, stmt.Name)
}
return &influxql.Result{Err: err}
}
func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement) *influxql.Result {
rpu := &RetentionPolicyUpdate{
Duration: stmt.Duration,
ReplicaN: stmt.Replication,
}
// Update the retention policy.
err := e.Store.UpdateRetentionPolicy(stmt.Database, stmt.Name, rpu)
if err != nil {
return &influxql.Result{Err: err}
}
// If requested, set as default retention policy.
if stmt.Default {
err = e.Store.SetDefaultRetentionPolicy(stmt.Database, stmt.Name)
}
return &influxql.Result{Err: err}
}
func (e *StatementExecutor) executeDropRetentionPolicyStatement(q *influxql.DropRetentionPolicyStatement) *influxql.Result {
return &influxql.Result{Err: e.Store.DropRetentionPolicy(q.Database, q.Name)}
}
func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) *influxql.Result {
di, err := e.Store.Database(q.Database)
if err != nil {
return &influxql.Result{Err: err}
} else if di == nil {
return &influxql.Result{Err: influxdb.ErrDatabaseNotFound(q.Database)}
}
row := &models.Row{Columns: []string{"name", "duration", "replicaN", "default"}}
for _, rpi := range di.RetentionPolicies {
row.Values = append(row.Values, []interface{}{rpi.Name, rpi.Duration.String(), rpi.ReplicaN, di.DefaultRetentionPolicy == rpi.Name})
}
return &influxql.Result{Series: []*models.Row{row}}
}
func (e *StatementExecutor) executeCreateContinuousQueryStatement(q *influxql.CreateContinuousQueryStatement) *influxql.Result {
return &influxql.Result{
Err: e.Store.CreateContinuousQuery(q.Database, q.Name, q.String()),
}
}
func (e *StatementExecutor) executeDropContinuousQueryStatement(q *influxql.DropContinuousQueryStatement) *influxql.Result {
return &influxql.Result{
Err: e.Store.DropContinuousQuery(q.Database, q.Name),
}
}
func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) *influxql.Result {
dis, err := e.Store.Databases()
if err != nil {
return &influxql.Result{Err: err}
}
rows := []*models.Row{}
for _, di := range dis {
row := &models.Row{Columns: []string{"name", "query"}, Name: di.Name}
for _, cqi := range di.ContinuousQueries {
row.Values = append(row.Values, []interface{}{cqi.Name, cqi.Query})
}
rows = append(rows, row)
}
return &influxql.Result{Series: rows}
}
func (e *StatementExecutor) executeCreateSubscriptionStatement(q *influxql.CreateSubscriptionStatement) *influxql.Result {
return &influxql.Result{
Err: e.Store.CreateSubscription(q.Database, q.RetentionPolicy, q.Name, q.Mode, q.Destinations),
}
}
func (e *StatementExecutor) executeDropSubscriptionStatement(q *influxql.DropSubscriptionStatement) *influxql.Result {
return &influxql.Result{
Err: e.Store.DropSubscription(q.Database, q.RetentionPolicy, q.Name),
}
}
func (e *StatementExecutor) executeShowSubscriptionsStatement(stmt *influxql.ShowSubscriptionsStatement) *influxql.Result {
dis, err := e.Store.Databases()
if err != nil {
return &influxql.Result{Err: err}
}
rows := []*models.Row{}
for _, di := range dis {
row := &models.Row{Columns: []string{"retention_policy", "name", "mode", "destinations"}, Name: di.Name}
for _, rpi := range di.RetentionPolicies {
for _, si := range rpi.Subscriptions {
row.Values = append(row.Values, []interface{}{rpi.Name, si.Name, si.Mode, si.Destinations})
}
}
if len(row.Values) > 0 {
rows = append(rows, row)
}
}
return &influxql.Result{Series: rows}
}
func (e *StatementExecutor) executeShowShardGroupsStatement(stmt *influxql.ShowShardGroupsStatement) *influxql.Result {
dis, err := e.Store.Databases()
if err != nil {
return &influxql.Result{Err: err}
}
row := &models.Row{Columns: []string{"id", "database", "retention_policy", "start_time", "end_time", "expiry_time"}, Name: "shard groups"}
for _, di := range dis {
for _, rpi := range di.RetentionPolicies {
for _, sgi := range rpi.ShardGroups {
// Shards associated with deleted shard groups are effectively deleted.
// Don't list them.
if sgi.Deleted() {
continue
}
row.Values = append(row.Values, []interface{}{
sgi.ID,
di.Name,
rpi.Name,
sgi.StartTime.UTC().Format(time.RFC3339),
sgi.EndTime.UTC().Format(time.RFC3339),
sgi.EndTime.Add(rpi.Duration).UTC().Format(time.RFC3339),
})
}
}
}
return &influxql.Result{Series: []*models.Row{row}}
}
func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShardsStatement) *influxql.Result {
dis, err := e.Store.Databases()
if err != nil {
return &influxql.Result{Err: err}
}
rows := []*models.Row{}
for _, di := range dis {
row := &models.Row{Columns: []string{"id", "database", "retention_policy", "shard_group", "start_time", "end_time", "expiry_time", "owners"}, Name: di.Name}
for _, rpi := range di.RetentionPolicies {
for _, sgi := range rpi.ShardGroups {
// Shards associated with deleted shard groups are effectively deleted.
// Don't list them.
if sgi.Deleted() {
continue
}
for _, si := range sgi.Shards {
ownerIDs := make([]uint64, len(si.Owners))
for i, owner := range si.Owners {
ownerIDs[i] = owner.NodeID
}
row.Values = append(row.Values, []interface{}{
si.ID,
di.Name,
rpi.Name,
sgi.ID,
sgi.StartTime.UTC().Format(time.RFC3339),
sgi.EndTime.UTC().Format(time.RFC3339),
sgi.EndTime.Add(rpi.Duration).UTC().Format(time.RFC3339),
joinUint64(ownerIDs),
})
}
}
}
rows = append(rows, row)
}
return &influxql.Result{Series: rows}
}
func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsStatement) *influxql.Result {
return &influxql.Result{Err: fmt.Errorf("SHOW STATS is not implemented yet")}
}
// joinUint64 returns a comma-delimited string of uint64 numbers.
func joinUint64(a []uint64) string {
var buf bytes.Buffer
for i, x := range a {
buf.WriteString(strconv.FormatUint(x, 10))
if i < len(a)-1 {
buf.WriteRune(',')
}
}
return buf.String()
}

File diff suppressed because it is too large Load Diff

View File

@ -1,25 +1,24 @@
package meta
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/influxdb/influxdb/services/meta/internal"
"github.com/gogo/protobuf/proto"
"github.com/hashicorp/raft"
)
type store struct {
id uint64 // local node id
const raftListenerStartupTimeout = time.Second
type store struct {
mu sync.RWMutex
closing chan struct{}
@ -27,12 +26,8 @@ type store struct {
data *Data
raftState *raftState
dataChanged chan struct{}
ready chan struct{}
addr string
raftln net.Listener
path string
opened bool
peers []string
logger *log.Logger
// Authentication cache.
@ -44,15 +39,14 @@ type authUser struct {
hash []byte
}
// newStore will create a new metastore with the passed in config
func newStore(c *Config) *store {
s := store{
data: &Data{
Index: 1,
},
ready: make(chan struct{}),
closing: make(chan struct{}),
dataChanged: make(chan struct{}),
addr: c.RaftBindAddress,
path: c.Dir,
config: c,
}
@ -66,16 +60,33 @@ func newStore(c *Config) *store {
}
// open opens and initializes the raft store.
func (s *store) open() error {
ln, err := net.Listen("tcp", s.addr)
if err != nil {
return err
}
s.raftln = ln
s.addr = ln.Addr().String()
func (s *store) open(addr string, raftln net.Listener) error {
s.logger.Printf("Using data dir: %v", s.path)
// wait for the raft listener to start
timeout := time.Now().Add(raftListenerStartupTimeout)
for {
if raftln.Addr() != nil {
break
}
if time.Now().After(timeout) {
return fmt.Errorf("unable to open without raft listener running")
}
time.Sleep(10 * time.Millisecond)
}
// See if this server needs to join the raft consensus group
var initializePeers []string
if len(s.config.JoinPeers) > 0 {
c := NewClient(s.config.JoinPeers, s.config.HTTPSEnabled)
data := c.retryUntilSnapshot(0)
for _, n := range data.MetaNodes {
initializePeers = append(initializePeers, n.TCPHost)
}
initializePeers = append(initializePeers, raftln.Addr().String())
}
if err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
@ -86,134 +97,63 @@ func (s *store) open() error {
}
s.opened = true
// load our raft peers
if err := s.loadPeers(); err != nil {
return err
}
// Create the root directory if it doesn't already exist.
if err := os.MkdirAll(s.path, 0777); err != nil {
return fmt.Errorf("mkdir all: %s", err)
}
// Open the raft store.
if err := s.openRaft(); err != nil {
if err := s.openRaft(initializePeers, raftln); err != nil {
return fmt.Errorf("raft: %s", err)
}
// Initialize the store, if necessary.
if err := s.raftState.initialize(); err != nil {
return fmt.Errorf("initialize raft: %s", err)
}
// Load existing ID, if exists.
if err := s.readID(); err != nil {
return fmt.Errorf("read id: %s", err)
}
return nil
}(); err != nil {
return err
}
// Join an existing cluster if we needed
if err := s.joinCluster(); err != nil {
return fmt.Errorf("join: %v", err)
}
if len(s.config.JoinPeers) > 0 {
c := NewClient(s.config.JoinPeers, s.config.HTTPSEnabled)
if err := c.Open(); err != nil {
return err
}
defer c.Close()
// If the ID doesn't exist then create a new node.
if s.id == 0 {
go s.raftState.initialize()
} else {
// TODO: enable node info sync
// all this does is update the raft peers with the new hostname of this node if it changed
// based on the ID of this node
// go s.syncNodeInfo()
close(s.ready)
}
// Wait for a leader to be elected so we know the raft log is loaded
// and up to date
//<-s.ready
if err := s.waitForLeader(0); err != nil {
return err
}
return nil
}
// loadPeers sets the appropriate peers from our persistent storage
func (s *store) loadPeers() error {
peers, err := readPeersJSON(filepath.Join(s.path, "peers.json"))
if err != nil {
return err
}
// If we have existing peers, use those. This will override what's in the
// config.
if len(peers) > 0 {
s.peers = peers
if _, err := os.Stat(filepath.Join(s.path, "raft.db")); err != nil {
if err := c.JoinMetaServer(addr, raftln.Addr().String()); err != nil {
return err
}
}
return nil
}
func readPeersJSON(path string) ([]string, error) {
// Read the file
buf, err := ioutil.ReadFile(path)
if err != nil && !os.IsNotExist(err) {
return nil, err
// Wait for a leader to be elected so we know the raft log is loaded
// and up to date
if err := s.waitForLeader(0); err != nil {
return err
}
// Check for no peers
if len(buf) == 0 {
return nil, nil
}
// Decode the peers
var peers []string
dec := json.NewDecoder(bytes.NewReader(buf))
if err := dec.Decode(&peers); err != nil {
return nil, err
}
return peers, nil
}
// IDPath returns the path to the local node ID file.
func (s *store) IDPath() string { return filepath.Join(s.path, "id") }
// readID reads the local node ID from the ID file.
func (s *store) readID() error {
b, err := ioutil.ReadFile(s.IDPath())
if os.IsNotExist(err) {
s.id = 0
return nil
} else if err != nil {
return fmt.Errorf("read file: %s", err)
}
id, err := strconv.ParseUint(string(b), 10, 64)
// Make sure this server is in the list of metanodes
peers, err := s.raftState.peers()
if err != nil {
return fmt.Errorf("parse id: %s", err)
return err
}
if len(peers) <= 1 {
if err := s.createMetaNode(addr, raftln.Addr().String()); err != nil {
return err
}
}
// if we joined this server to the cluster, we need to add it as a metanode
if len(s.config.JoinPeers) > 0 {
}
s.id = id
return nil
}
func (s *store) openRaft() error {
rs := newRaftState(s.config, s.peers)
rs.ln = s.raftln
func (s *store) openRaft(initializePeers []string, raftln net.Listener) error {
rs := newRaftState(s.config)
rs.logger = s.logger
rs.path = s.path
rs.remoteAddr = s.raftln.Addr()
if err := rs.open(s); err != nil {
if err := rs.open(s, raftln, initializePeers); err != nil {
return err
}
s.raftState = rs
@ -221,51 +161,6 @@ func (s *store) openRaft() error {
return nil
}
func (s *store) joinCluster() error {
// No join options, so nothing to do
if len(s.peers) == 0 {
return nil
}
// We already have a node ID so were already part of a cluster,
// don't join again so we can use our existing state.
if s.id != 0 {
s.logger.Printf("Skipping cluster join: already member of cluster: nodeId=%v raftEnabled=%v peers=%v",
s.id, raft.PeerContained(s.peers, s.addr), s.peers)
return nil
}
s.logger.Printf("Joining cluster at: %v", s.peers)
for {
for _, join := range s.peers {
// delete me:
_ = join
// TODO rework this to use the HTTP endpoint for joining
//res, err := s.rpc.join(s.RemoteAddr.String(), join)
//if err != nil {
//s.logger.Printf("Join node %v failed: %v: retrying...", join, err)
//continue
//}
//s.logger.Printf("Joined remote node %v", join)
//s.logger.Printf("nodeId=%v raftEnabled=%v peers=%v", res.NodeID, res.RaftEnabled, res.RaftNodes)
//s.peers = res.RaftNodes
//s.id = res.NodeID
//if err := s.writeNodeID(res.NodeID); err != nil {
//s.logger.Printf("Write node id failed: %v", err)
//break
//}
return nil
}
time.Sleep(time.Second)
}
}
func (s *store) close() error {
s.mu.Lock()
defer s.mu.Unlock()
@ -342,6 +237,25 @@ func (s *store) leader() string {
return s.raftState.raft.Leader()
}
// leaderHTTP returns the HTTP API connection info for the metanode
// that is the raft leader
func (s *store) leaderHTTP() string {
s.mu.RLock()
defer s.mu.RUnlock()
if s.raftState == nil {
return ""
}
l := s.raftState.raft.Leader()
for _, n := range s.data.MetaNodes {
if n.TCPHost == l {
return n.Host
}
}
return ""
}
// index returns the current store index.
func (s *store) index() uint64 {
s.mu.RLock()
@ -354,6 +268,39 @@ func (s *store) apply(b []byte) error {
return s.raftState.apply(b)
}
// join adds a new server to the metaservice and raft
func (s *store) join(n *NodeInfo) error {
if err := s.raftState.addPeer(n.TCPHost); err != nil {
return err
}
return s.createMetaNode(n.Host, n.TCPHost)
}
// leave removes a server from the metaservice and raft
func (s *store) leave(n *NodeInfo) error {
return s.raftState.removePeer(n.TCPHost)
}
func (s *store) createMetaNode(addr, raftAddr string) error {
val := &internal.CreateMetaNodeCommand{
HTTPAddr: proto.String(addr),
TCPAddr: proto.String(raftAddr),
}
t := internal.Command_CreateMetaNodeCommand
cmd := &internal.Command{Type: &t}
if err := proto.SetExtension(cmd, internal.E_CreateMetaNodeCommand_Command, val); err != nil {
panic(err)
}
b, err := proto.Marshal(cmd)
if err != nil {
return err
}
return s.apply(b)
}
// RetentionPolicyUpdate represents retention policy fields to be updated.
type RetentionPolicyUpdate struct {
Name *string

View File

@ -31,7 +31,13 @@ func (fsm *storeFSM) Apply(l *raft.Log) interface{} {
case internal.Command_RemovePeerCommand:
return fsm.applyRemovePeerCommand(&cmd)
case internal.Command_CreateNodeCommand:
return fsm.applyCreateNodeCommand(&cmd)
// create node was in < 0.10.0 servers, we need the peers
// list to convert to the appropriate data/meta nodes now
peers, err := s.raftState.peers()
if err != nil {
return err
}
return fsm.applyCreateNodeCommand(&cmd, peers)
case internal.Command_DeleteNodeCommand:
return fsm.applyDeleteNodeCommand(&cmd)
case internal.Command_CreateDatabaseCommand:
@ -72,6 +78,10 @@ func (fsm *storeFSM) Apply(l *raft.Log) interface{} {
return fsm.applySetDataCommand(&cmd)
case internal.Command_UpdateNodeCommand:
return fsm.applyUpdateNodeCommand(&cmd)
case internal.Command_CreateMetaNodeCommand:
return fsm.applyCreateMetaNodeCommand(&cmd)
case internal.Command_DeleteMetaNodeCommand:
return fsm.applyDeleteMetaNodeCommand(&cmd, s)
default:
panic(fmt.Errorf("cannot apply command: %x", l.Data))
}
@ -92,36 +102,45 @@ func (fsm *storeFSM) applyRemovePeerCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_RemovePeerCommand_Command)
v := ext.(*internal.RemovePeerCommand)
id := v.GetID()
addr := v.GetAddr()
// Only do this if you are the leader
if fsm.raftState.isLeader() {
//Remove that node from the peer
fsm.logger.Printf("removing peer for node id %d, %s", id, addr)
fsm.logger.Printf("removing peer: %s", addr)
if err := fsm.raftState.removePeer(addr); err != nil {
fsm.logger.Printf("error removing peer: %s", err)
}
}
// If this is the node being shutdown, close raft
if fsm.id == id {
fsm.logger.Printf("shutting down raft for %s", addr)
if err := fsm.raftState.close(); err != nil {
fsm.logger.Printf("failed to shut down raft: %s", err)
}
}
return nil
}
func (fsm *storeFSM) applyCreateNodeCommand(cmd *internal.Command) interface{} {
func (fsm *storeFSM) applyCreateNodeCommand(cmd *internal.Command, peers []string) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateNodeCommand_Command)
v := ext.(*internal.CreateNodeCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.CreateNode(v.GetHost()); err != nil {
// CreateNode is a command from < 0.10.0 clusters. Every node in
// those clusters would be a data node and only the nodes that are
// in the list of peers would be meta nodes
isMeta := false
for _, p := range peers {
if v.GetHost() == p {
isMeta = true
break
}
}
if isMeta {
if err := other.CreateMetaNode(v.GetHost(), v.GetHost()); err != nil {
return err
}
}
if err := other.CreateDataNode(v.GetHost()); err != nil {
return err
}
@ -134,37 +153,32 @@ func (fsm *storeFSM) applyCreateNodeCommand(cmd *internal.Command) interface{} {
return nil
}
// applyUpdateNodeCommand was in < 0.10.0, noop this now
func (fsm *storeFSM) applyUpdateNodeCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_UpdateNodeCommand_Command)
v := ext.(*internal.UpdateNodeCommand)
return nil
}
func (fsm *storeFSM) applyUpdateDataNodeCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateNodeCommand_Command)
v := ext.(*internal.UpdateDataNodeCommand)
// Copy data and update.
other := fsm.data.Clone()
ni := other.Node(v.GetID())
if ni == nil {
node := other.DataNode(v.GetID())
if node == nil {
return ErrNodeNotFound
}
ni.Host = v.GetHost()
node.Host = v.GetHost()
node.TCPHost = v.GetTCPHost()
fsm.data = other
return nil
}
// applyDeleteNodeCommand is from < 0.10.0. no op for this one
func (fsm *storeFSM) applyDeleteNodeCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DeleteNodeCommand_Command)
v := ext.(*internal.DeleteNodeCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DeleteNode(v.GetID(), v.GetForce()); err != nil {
return err
}
fsm.data = other
id := v.GetID()
fsm.logger.Printf("node '%d' removed", id)
return nil
}
@ -433,6 +447,37 @@ func (fsm *storeFSM) applySetDataCommand(cmd *internal.Command) interface{} {
return nil
}
func (fsm *storeFSM) applyCreateMetaNodeCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateMetaNodeCommand_Command)
v := ext.(*internal.CreateMetaNodeCommand)
other := fsm.data.Clone()
other.CreateMetaNode(v.GetHTTPAddr(), v.GetTCPAddr())
fsm.data = other
return nil
}
func (fsm *storeFSM) applyDeleteMetaNodeCommand(cmd *internal.Command, s *store) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DeleteMetaNodeCommand_Command)
v := ext.(*internal.DeleteMetaNodeCommand)
other := fsm.data.Clone()
node := other.MetaNode(v.GetID())
if node == nil {
return ErrNodeNotFound
}
if err := s.leave(node); err != nil && err != raft.ErrNotLeader {
return err
}
if err := other.DeleteMetaNode(v.GetID()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) Snapshot() (raft.FSMSnapshot, error) {
s := (*store)(fsm)
s.mu.Lock()

View File

@ -41,6 +41,7 @@ func NewMux() *Mux {
// Serve handles connections from ln and multiplexes then across registered listener.
func (mux *Mux) Serve(ln net.Listener) error {
mux.ln = ln
for {
// Wait for the next connection.
// If it returns a temporary error then simply retry.
@ -112,7 +113,8 @@ func (mux *Mux) Listen(header byte) net.Listener {
// Create a new listener and assign it.
ln := &listener{
c: make(chan net.Conn),
c: make(chan net.Conn),
mux: mux,
}
mux.m[header] = ln
@ -121,7 +123,8 @@ func (mux *Mux) Listen(header byte) net.Listener {
// listener is a receiver for connections received by Mux.
type listener struct {
c chan net.Conn
c chan net.Conn
mux *Mux
}
// Accept waits for and returns the next connection to the listener.
@ -136,8 +139,13 @@ func (ln *listener) Accept() (c net.Conn, err error) {
// Close is a no-op. The mux's listener should be closed instead.
func (ln *listener) Close() error { return nil }
// Addr always returns nil.
func (ln *listener) Addr() net.Addr { return nil }
// Addr returns the Addr of the listener
func (ln *listener) Addr() net.Addr {
if ln.mux != nil && ln.mux.ln != nil {
return ln.mux.ln.Addr()
}
return nil
}
// Dial connects to a remote mux listener with a given header byte.
func Dial(network, address string, header byte) (net.Conn, error) {