Support dropping non-Raft nodes

pull/4310/head
Philip O'Toole 2015-10-02 19:49:11 -07:00
parent d4fb66290a
commit 2ac0357406
18 changed files with 429 additions and 153 deletions

View File

@ -12,6 +12,7 @@
- [#4262](https://github.com/influxdb/influxdb/pull/4262): Allow configuration of UDP retention policy
- [#4265](https://github.com/influxdb/influxdb/pull/4265): Add statistics for Hinted-Handoff
- [#4284](https://github.com/influxdb/influxdb/pull/4284): Add exponential backoff for hinted-handoff failures
- [#4310](https://github.com/influxdb/influxdb/pull/4310): Support dropping non-Raft nodes. Work mostly by @corylanou
### Bugfixes
- [#4166](https://github.com/influxdb/influxdb/pull/4166): Fix parser error on invalid SHOW

View File

@ -17,15 +17,17 @@ It has these top-level messages:
package internal
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type WriteShardRequest struct {
ShardID *uint64 `protobuf:"varint,1,req" json:"ShardID,omitempty"`
Points [][]byte `protobuf:"bytes,2,rep" json:"Points,omitempty"`
ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"`
Points [][]byte `protobuf:"bytes,2,rep,name=Points" json:"Points,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -48,8 +50,8 @@ func (m *WriteShardRequest) GetPoints() [][]byte {
}
type WriteShardResponse struct {
Code *int32 `protobuf:"varint,1,req" json:"Code,omitempty"`
Message *string `protobuf:"bytes,2,opt" json:"Message,omitempty"`
Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"`
Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -72,9 +74,9 @@ func (m *WriteShardResponse) GetMessage() string {
}
type MapShardRequest struct {
ShardID *uint64 `protobuf:"varint,1,req" json:"ShardID,omitempty"`
Query *string `protobuf:"bytes,2,req" json:"Query,omitempty"`
ChunkSize *int32 `protobuf:"varint,3,req" json:"ChunkSize,omitempty"`
ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"`
Query *string `protobuf:"bytes,2,req,name=Query" json:"Query,omitempty"`
ChunkSize *int32 `protobuf:"varint,3,req,name=ChunkSize" json:"ChunkSize,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -104,11 +106,11 @@ func (m *MapShardRequest) GetChunkSize() int32 {
}
type MapShardResponse struct {
Code *int32 `protobuf:"varint,1,req" json:"Code,omitempty"`
Message *string `protobuf:"bytes,2,opt" json:"Message,omitempty"`
Data []byte `protobuf:"bytes,3,opt" json:"Data,omitempty"`
TagSets []string `protobuf:"bytes,4,rep" json:"TagSets,omitempty"`
Fields []string `protobuf:"bytes,5,rep" json:"Fields,omitempty"`
Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"`
Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"`
Data []byte `protobuf:"bytes,3,opt,name=Data" json:"Data,omitempty"`
TagSets []string `protobuf:"bytes,4,rep,name=TagSets" json:"TagSets,omitempty"`
Fields []string `protobuf:"bytes,5,rep,name=Fields" json:"Fields,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -150,6 +152,3 @@ func (m *MapShardResponse) GetFields() []string {
}
return nil
}
func init() {
}

View File

@ -92,6 +92,7 @@ func (*DropDatabaseStatement) node() {}
func (*DropMeasurementStatement) node() {}
func (*DropRetentionPolicyStatement) node() {}
func (*DropSeriesStatement) node() {}
func (*DropServerStatement) node() {}
func (*DropUserStatement) node() {}
func (*GrantStatement) node() {}
func (*GrantAdminStatement) node() {}
@ -198,6 +199,7 @@ func (*DropDatabaseStatement) stmt() {}
func (*DropMeasurementStatement) stmt() {}
func (*DropRetentionPolicyStatement) stmt() {}
func (*DropSeriesStatement) stmt() {}
func (*DropServerStatement) stmt() {}
func (*DropUserStatement) stmt() {}
func (*GrantStatement) stmt() {}
func (*GrantAdminStatement) stmt() {}
@ -1824,6 +1826,30 @@ func (s DropSeriesStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: false, Name: "", Privilege: WritePrivilege}}
}
// DropServerStatement represents a command for removing a server from the cluster.
type DropServerStatement struct {
// ID of the node to be dropped.
NodeID uint64
// Force will force the server to drop even it it means losing data
Force bool
}
// String returns a string representation of the drop series statement.
func (s *DropServerStatement) String() string {
var buf bytes.Buffer
_, _ = buf.WriteString("DROP SERVER ")
_, _ = buf.WriteString(strconv.FormatUint(s.NodeID, 10))
if s.Force {
_, _ = buf.WriteString(" FORCE")
}
return buf.String()
}
// RequiredPrivileges returns the privilege required to execute a DropServerStatement.
func (s *DropServerStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}
// ShowContinuousQueriesStatement represents a command for listing continuous queries.
type ShowContinuousQueriesStatement struct{}

View File

@ -208,6 +208,8 @@ func (p *Parser) parseDropStatement() (Statement, error) {
return p.parseDropRetentionPolicyStatement()
} else if tok == USER {
return p.parseDropUserStatement()
} else if tok == SERVER {
return p.parseDropServerStatement()
}
return nil, newParseError(tokstr(tok, lit), []string{"SERIES", "CONTINUOUS", "MEASUREMENT"}, pos)
@ -311,8 +313,8 @@ func (p *Parser) parseCreateRetentionPolicyStatement() (*CreateRetentionPolicySt
// Parse optional DEFAULT token.
if tok, pos, lit = p.scanIgnoreWhitespace(); tok == DEFAULT {
stmt.Default = true
} else {
p.unscan()
} else if tok != EOF && tok != SEMICOLON {
return nil, newParseError(tokstr(tok, lit), []string{"DEFAULT"}, pos)
}
return stmt, nil
@ -1178,6 +1180,27 @@ func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) {
return stmt, nil
}
// parseDropServerStatement parses a string and returns a DropServerStatement.
// This function assumes the "DROP SERVER" tokens have already been consumed.
func (p *Parser) parseDropServerStatement() (*DropServerStatement, error) {
s := &DropServerStatement{}
var err error
// Parse the server's ID.
if s.NodeID, err = p.parseUInt64(); err != nil {
return nil, err
}
// Parse optional FORCE token.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok == FORCE {
s.Force = true
} else if tok != EOF && tok != SEMICOLON {
return nil, newParseError(tokstr(tok, lit), []string{"FORCE"}, pos)
}
return s, nil
}
// parseShowContinuousQueriesStatement parses a string and returns a ShowContinuousQueriesStatement.
// This function assumes the "SHOW CONTINUOUS" tokens have already been consumed.
func (p *Parser) parseShowContinuousQueriesStatement() (*ShowContinuousQueriesStatement, error) {

View File

@ -1013,6 +1013,16 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// DROP SERVER statement
{
s: `DROP SERVER 123`,
stmt: &influxql.DropServerStatement{NodeID: 123},
},
{
s: `DROP SERVER 123 FORCE`,
stmt: &influxql.DropServerStatement{NodeID: 123, Force: true},
},
// SHOW CONTINUOUS QUERIES statement
{
s: `SHOW CONTINUOUS QUERIES`,
@ -1453,15 +1463,15 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SELECT top() FROM myseries`, err: `invalid number of arguments for top, expected at least 2, got 0`},
{s: `SELECT top(field1) FROM myseries`, err: `invalid number of arguments for top, expected at least 2, got 1`},
{s: `SELECT top(field1,foo) FROM myseries`, err: `expected integer as last argument in top(), found foo`},
{s: `SELECT top(field1,host,server,foo) FROM myseries`, err: `expected integer as last argument in top(), found foo`},
{s: `SELECT top(field1,5,server,2) FROM myseries`, err: `only fields or tags are allowed in top(), found 5.000`},
{s: `SELECT top(field1,max(foo),server,2) FROM myseries`, err: `only fields or tags are allowed in top(), found max(foo)`},
{s: `SELECT top(field1,host,'server',foo) FROM myseries`, err: `expected integer as last argument in top(), found foo`},
{s: `SELECT top(field1,5,'server',2) FROM myseries`, err: `only fields or tags are allowed in top(), found 5.000`},
{s: `SELECT top(field1,max(foo),'server',2) FROM myseries`, err: `only fields or tags are allowed in top(), found max(foo)`},
{s: `SELECT bottom() FROM myseries`, err: `invalid number of arguments for bottom, expected at least 2, got 0`},
{s: `SELECT bottom(field1) FROM myseries`, err: `invalid number of arguments for bottom, expected at least 2, got 1`},
{s: `SELECT bottom(field1,foo) FROM myseries`, err: `expected integer as last argument in bottom(), found foo`},
{s: `SELECT bottom(field1,host,server,foo) FROM myseries`, err: `expected integer as last argument in bottom(), found foo`},
{s: `SELECT bottom(field1,5,server,2) FROM myseries`, err: `only fields or tags are allowed in bottom(), found 5.000`},
{s: `SELECT bottom(field1,max(foo),server,2) FROM myseries`, err: `only fields or tags are allowed in bottom(), found max(foo)`},
{s: `SELECT bottom(field1,host,'server',foo) FROM myseries`, err: `expected integer as last argument in bottom(), found foo`},
{s: `SELECT bottom(field1,5,'server',2) FROM myseries`, err: `only fields or tags are allowed in bottom(), found 5.000`},
{s: `SELECT bottom(field1,max(foo),'server',2) FROM myseries`, err: `only fields or tags are allowed in bottom(), found max(foo)`},
{s: `SELECT percentile() FROM myseries`, err: `invalid number of arguments for percentile, expected 2, got 0`},
{s: `SELECT percentile(field1) FROM myseries`, err: `invalid number of arguments for percentile, expected 2, got 1`},
{s: `SELECT percentile(field1, foo) FROM myseries`, err: `expected float argument in percentile()`},
@ -1515,6 +1525,9 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `DROP SERIES`, err: `found EOF, expected FROM, WHERE at line 1, char 13`},
{s: `DROP SERIES FROM`, err: `found EOF, expected identifier at line 1, char 18`},
{s: `DROP SERIES FROM src WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`},
{s: `DROP SERVER`, err: `found EOF, expected number at line 1, char 13`},
{s: `DROP SERVER abc`, err: `found abc, expected number at line 1, char 13`},
{s: `DROP SERVER 1 1`, err: `found 1, expected FORCE at line 1, char 15`},
{s: `SHOW CONTINUOUS`, err: `found EOF, expected QUERIES at line 1, char 17`},
{s: `SHOW RETENTION`, err: `found EOF, expected POLICIES at line 1, char 16`},
{s: `SHOW RETENTION ON`, err: `found ON, expected POLICIES at line 1, char 16`},
@ -1624,6 +1637,7 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 3.14`, err: `number must be an integer at line 1, char 67`},
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 0`, err: `invalid value 0: must be 1 <= n <= 2147483647 at line 1, char 67`},
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION bad`, err: `found bad, expected number at line 1, char 67`},
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 1 foo`, err: `found foo, expected DEFAULT at line 1, char 69`},
{s: `ALTER`, err: `found EOF, expected RETENTION at line 1, char 7`},
{s: `ALTER RETENTION`, err: `found EOF, expected POLICY at line 1, char 17`},
{s: `ALTER RETENTION POLICY`, err: `found EOF, expected identifier at line 1, char 24`},

View File

@ -154,6 +154,8 @@ func TestScanner_Scan(t *testing.T) {
{s: `REVOKE`, tok: influxql.REVOKE},
{s: `SELECT`, tok: influxql.SELECT},
{s: `SERIES`, tok: influxql.SERIES},
{s: `SERVER`, tok: influxql.SERVER},
{s: `SERVERS`, tok: influxql.SERVERS},
{s: `TAG`, tok: influxql.TAG},
{s: `TO`, tok: influxql.TO},
{s: `USER`, tok: influxql.USER},

View File

@ -77,6 +77,7 @@ const (
EXPLAIN
FIELD
FOR
FORCE
FROM
GRANT
GRANTS
@ -108,6 +109,7 @@ const (
REVOKE
SELECT
SERIES
SERVER
SERVERS
SET
SHOW
@ -187,6 +189,7 @@ var tokens = [...]string{
EXPLAIN: "EXPLAIN",
FIELD: "FIELD",
FOR: "FOR",
FORCE: "FORCE",
FROM: "FROM",
GRANT: "GRANT",
GRANTS: "GRANTS",
@ -218,6 +221,7 @@ var tokens = [...]string{
REVOKE: "REVOKE",
SELECT: "SELECT",
SERIES: "SERIES",
SERVER: "SERVER",
SERVERS: "SERVERS",
SET: "SET",
SHOW: "SHOW",

View File

@ -74,14 +74,71 @@ func (data *Data) CreateNode(host string) error {
}
// DeleteNode removes a node from the metadata.
func (data *Data) DeleteNode(id uint64) error {
for i := range data.Nodes {
if data.Nodes[i].ID == id {
data.Nodes = append(data.Nodes[:i], data.Nodes[i+1:]...)
return nil
func (data *Data) DeleteNode(id uint64, force bool) error {
// Node has to be larger than 0 to be real
if id == 0 {
return ErrNodeIDRequired
}
// Is this a valid node?
nodeInfo := data.Node(id)
if nodeInfo == nil {
return ErrNodeNotFound
}
// Am I the only node? If so, nothing to do
if len(data.Nodes) == 1 {
return ErrNodeUnableToDropFinalNode
}
// Determine if there are any any non-replicated nodes and force was not specified
if !force {
for _, d := range data.Databases {
for _, rp := range d.RetentionPolicies {
// ignore replicated retention policies
if rp.ReplicaN > 1 {
continue
}
for _, sg := range rp.ShardGroups {
for _, s := range sg.Shards {
if s.OwnedBy(id) && len(s.Owners) == 1 {
return ErrShardNotReplicated
}
}
}
}
}
}
return ErrNodeNotFound
// Remove node id from all shard infos
for di, d := range data.Databases {
for ri, rp := range d.RetentionPolicies {
for sgi, sg := range rp.ShardGroups {
for si, s := range sg.Shards {
if s.OwnedBy(id) {
var owners []ShardOwner
for _, o := range s.Owners {
if o.NodeID != id {
owners = append(owners, o)
}
}
data.Databases[di].RetentionPolicies[ri].ShardGroups[sgi].Shards[si].Owners = owners
}
}
}
}
}
// Remove this node from the in memory nodes
var nodes []NodeInfo
for _, n := range data.Nodes {
if n.ID == id {
continue
}
nodes = append(nodes, n)
}
data.Nodes = nodes
return nil
}
// Database returns a database by name.

View File

@ -26,7 +26,7 @@ func TestData_CreateNode(t *testing.T) {
}
// Ensure a node can be removed.
func TestData_DeleteNode(t *testing.T) {
func TestData_DeleteNode_Basic(t *testing.T) {
var data meta.Data
if err := data.CreateNode("host0"); err != nil {
t.Fatal(err)
@ -36,7 +36,7 @@ func TestData_DeleteNode(t *testing.T) {
t.Fatal(err)
}
if err := data.DeleteNode(1); err != nil {
if err := data.DeleteNode(1, false); err != nil {
t.Fatal(err)
} else if len(data.Nodes) != 2 {
t.Fatalf("unexpected node count: %d", len(data.Nodes))
@ -47,6 +47,49 @@ func TestData_DeleteNode(t *testing.T) {
}
}
// Ensure a node can be removed with shard info in play
func TestData_DeleteNode_Shards(t *testing.T) {
var data meta.Data
if err := data.CreateNode("host0"); err != nil {
t.Fatal(err)
} else if err = data.CreateNode("host1"); err != nil {
t.Fatal(err)
} else if err := data.CreateNode("host2"); err != nil {
t.Fatal(err)
} else if err := data.CreateNode("host3"); err != nil {
t.Fatal(err)
}
if err := data.CreateDatabase("mydb"); err != nil {
t.Fatal(err)
}
rpi := &meta.RetentionPolicyInfo{
Name: "myrp",
ReplicaN: 3,
}
if err := data.CreateRetentionPolicy("mydb", rpi); err != nil {
t.Fatal(err)
}
if err := data.CreateShardGroup("mydb", "myrp", time.Now()); err != nil {
t.Fatal(err)
}
if len(data.Databases[0].RetentionPolicies[0].ShardGroups[0].Shards[0].Owners) != 3 {
t.Fatal("wrong number of shard owners")
}
if err := data.DeleteNode(2, false); err != nil {
t.Fatal(err)
}
if got, exp := len(data.Databases[0].RetentionPolicies[0].ShardGroups[0].Shards[0].Owners), 2; exp != got {
t.Fatalf("wrong number of shard owners, got %d, exp %d", got, exp)
}
for _, s := range data.Databases[0].RetentionPolicies[0].ShardGroups[0].Shards {
if s.OwnedBy(2) {
t.Fatal("shard still owned by delted node")
}
}
}
// Ensure a database can be created.
func TestData_CreateDatabase(t *testing.T) {
var data meta.Data

View File

@ -26,6 +26,16 @@ var (
// ErrNodesRequired is returned when at least one node is required for an operation.
// This occurs when creating a shard group.
ErrNodesRequired = newError("at least one node required")
// ErrNodeIDRequired is returned when using a zero node id.
ErrNodeIDRequired = newError("node id must be greater than 0")
// ErrNodeUnableToDropSingleNode is returned if the node being dropped is the last
// node in the cluster
ErrNodeUnableToDropFinalNode = newError("unable to drop the final node in a cluster")
// ErrNodeRaft is returned when attempting an operation prohibted for a Raft-node.
ErrNodeRaft = newError("node is a Raft node")
)
var (
@ -73,6 +83,10 @@ var (
// ErrShardGroupNotFound is returned when mutating a shard group that doesn't exist.
ErrShardGroupNotFound = newError("shard group not found")
// ErrShardNotReplicated is returned if the node requested to be dropped has
// the last copy of a shard present and the force keyword was not used
ErrShardNotReplicated = newError("shard not replicated")
)
var (

View File

@ -50,10 +50,12 @@ It has these top-level messages:
package internal
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type RPCType int32
@ -177,15 +179,15 @@ func (x *Command_Type) UnmarshalJSON(data []byte) error {
}
type Data struct {
Term *uint64 `protobuf:"varint,1,req" json:"Term,omitempty"`
Index *uint64 `protobuf:"varint,2,req" json:"Index,omitempty"`
ClusterID *uint64 `protobuf:"varint,3,req" json:"ClusterID,omitempty"`
Nodes []*NodeInfo `protobuf:"bytes,4,rep" json:"Nodes,omitempty"`
Databases []*DatabaseInfo `protobuf:"bytes,5,rep" json:"Databases,omitempty"`
Users []*UserInfo `protobuf:"bytes,6,rep" json:"Users,omitempty"`
MaxNodeID *uint64 `protobuf:"varint,7,req" json:"MaxNodeID,omitempty"`
MaxShardGroupID *uint64 `protobuf:"varint,8,req" json:"MaxShardGroupID,omitempty"`
MaxShardID *uint64 `protobuf:"varint,9,req" json:"MaxShardID,omitempty"`
Term *uint64 `protobuf:"varint,1,req,name=Term" json:"Term,omitempty"`
Index *uint64 `protobuf:"varint,2,req,name=Index" json:"Index,omitempty"`
ClusterID *uint64 `protobuf:"varint,3,req,name=ClusterID" json:"ClusterID,omitempty"`
Nodes []*NodeInfo `protobuf:"bytes,4,rep,name=Nodes" json:"Nodes,omitempty"`
Databases []*DatabaseInfo `protobuf:"bytes,5,rep,name=Databases" json:"Databases,omitempty"`
Users []*UserInfo `protobuf:"bytes,6,rep,name=Users" json:"Users,omitempty"`
MaxNodeID *uint64 `protobuf:"varint,7,req,name=MaxNodeID" json:"MaxNodeID,omitempty"`
MaxShardGroupID *uint64 `protobuf:"varint,8,req,name=MaxShardGroupID" json:"MaxShardGroupID,omitempty"`
MaxShardID *uint64 `protobuf:"varint,9,req,name=MaxShardID" json:"MaxShardID,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -257,8 +259,8 @@ func (m *Data) GetMaxShardID() uint64 {
}
type NodeInfo struct {
ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"`
Host *string `protobuf:"bytes,2,req" json:"Host,omitempty"`
ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"`
Host *string `protobuf:"bytes,2,req,name=Host" json:"Host,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -281,10 +283,10 @@ func (m *NodeInfo) GetHost() string {
}
type DatabaseInfo struct {
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
DefaultRetentionPolicy *string `protobuf:"bytes,2,req" json:"DefaultRetentionPolicy,omitempty"`
RetentionPolicies []*RetentionPolicyInfo `protobuf:"bytes,3,rep" json:"RetentionPolicies,omitempty"`
ContinuousQueries []*ContinuousQueryInfo `protobuf:"bytes,4,rep" json:"ContinuousQueries,omitempty"`
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
DefaultRetentionPolicy *string `protobuf:"bytes,2,req,name=DefaultRetentionPolicy" json:"DefaultRetentionPolicy,omitempty"`
RetentionPolicies []*RetentionPolicyInfo `protobuf:"bytes,3,rep,name=RetentionPolicies" json:"RetentionPolicies,omitempty"`
ContinuousQueries []*ContinuousQueryInfo `protobuf:"bytes,4,rep,name=ContinuousQueries" json:"ContinuousQueries,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -321,11 +323,11 @@ func (m *DatabaseInfo) GetContinuousQueries() []*ContinuousQueryInfo {
}
type RetentionPolicyInfo struct {
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Duration *int64 `protobuf:"varint,2,req" json:"Duration,omitempty"`
ShardGroupDuration *int64 `protobuf:"varint,3,req" json:"ShardGroupDuration,omitempty"`
ReplicaN *uint32 `protobuf:"varint,4,req" json:"ReplicaN,omitempty"`
ShardGroups []*ShardGroupInfo `protobuf:"bytes,5,rep" json:"ShardGroups,omitempty"`
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Duration *int64 `protobuf:"varint,2,req,name=Duration" json:"Duration,omitempty"`
ShardGroupDuration *int64 `protobuf:"varint,3,req,name=ShardGroupDuration" json:"ShardGroupDuration,omitempty"`
ReplicaN *uint32 `protobuf:"varint,4,req,name=ReplicaN" json:"ReplicaN,omitempty"`
ShardGroups []*ShardGroupInfo `protobuf:"bytes,5,rep,name=ShardGroups" json:"ShardGroups,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -369,11 +371,11 @@ func (m *RetentionPolicyInfo) GetShardGroups() []*ShardGroupInfo {
}
type ShardGroupInfo struct {
ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"`
StartTime *int64 `protobuf:"varint,2,req" json:"StartTime,omitempty"`
EndTime *int64 `protobuf:"varint,3,req" json:"EndTime,omitempty"`
DeletedAt *int64 `protobuf:"varint,4,req" json:"DeletedAt,omitempty"`
Shards []*ShardInfo `protobuf:"bytes,5,rep" json:"Shards,omitempty"`
ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"`
StartTime *int64 `protobuf:"varint,2,req,name=StartTime" json:"StartTime,omitempty"`
EndTime *int64 `protobuf:"varint,3,req,name=EndTime" json:"EndTime,omitempty"`
DeletedAt *int64 `protobuf:"varint,4,req,name=DeletedAt" json:"DeletedAt,omitempty"`
Shards []*ShardInfo `protobuf:"bytes,5,rep,name=Shards" json:"Shards,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -417,9 +419,9 @@ func (m *ShardGroupInfo) GetShards() []*ShardInfo {
}
type ShardInfo struct {
ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"`
OwnerIDs []uint64 `protobuf:"varint,2,rep" json:"OwnerIDs,omitempty"`
Owners []*ShardOwner `protobuf:"bytes,3,rep" json:"Owners,omitempty"`
ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"`
OwnerIDs []uint64 `protobuf:"varint,2,rep,name=OwnerIDs" json:"OwnerIDs,omitempty"`
Owners []*ShardOwner `protobuf:"bytes,3,rep,name=Owners" json:"Owners,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -449,7 +451,7 @@ func (m *ShardInfo) GetOwners() []*ShardOwner {
}
type ShardOwner struct {
NodeID *uint64 `protobuf:"varint,1,req" json:"NodeID,omitempty"`
NodeID *uint64 `protobuf:"varint,1,req,name=NodeID" json:"NodeID,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -465,8 +467,8 @@ func (m *ShardOwner) GetNodeID() uint64 {
}
type ContinuousQueryInfo struct {
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Query *string `protobuf:"bytes,2,req" json:"Query,omitempty"`
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Query *string `protobuf:"bytes,2,req,name=Query" json:"Query,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -489,10 +491,10 @@ func (m *ContinuousQueryInfo) GetQuery() string {
}
type UserInfo struct {
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Hash *string `protobuf:"bytes,2,req" json:"Hash,omitempty"`
Admin *bool `protobuf:"varint,3,req" json:"Admin,omitempty"`
Privileges []*UserPrivilege `protobuf:"bytes,4,rep" json:"Privileges,omitempty"`
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Hash *string `protobuf:"bytes,2,req,name=Hash" json:"Hash,omitempty"`
Admin *bool `protobuf:"varint,3,req,name=Admin" json:"Admin,omitempty"`
Privileges []*UserPrivilege `protobuf:"bytes,4,rep,name=Privileges" json:"Privileges,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -529,8 +531,8 @@ func (m *UserInfo) GetPrivileges() []*UserPrivilege {
}
type UserPrivilege struct {
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Privilege *int32 `protobuf:"varint,2,req" json:"Privilege,omitempty"`
Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"`
Privilege *int32 `protobuf:"varint,2,req,name=Privilege" json:"Privilege,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -584,8 +586,8 @@ func (m *Command) GetType() Command_Type {
}
type CreateNodeCommand struct {
Host *string `protobuf:"bytes,1,req" json:"Host,omitempty"`
Rand *uint64 `protobuf:"varint,2,req" json:"Rand,omitempty"`
Host *string `protobuf:"bytes,1,req,name=Host" json:"Host,omitempty"`
Rand *uint64 `protobuf:"varint,2,req,name=Rand" json:"Rand,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -616,7 +618,8 @@ var E_CreateNodeCommand_Command = &proto.ExtensionDesc{
}
type DeleteNodeCommand struct {
ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"`
ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"`
Force *bool `protobuf:"varint,2,req,name=Force" json:"Force,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -631,6 +634,13 @@ func (m *DeleteNodeCommand) GetID() uint64 {
return 0
}
func (m *DeleteNodeCommand) GetForce() bool {
if m != nil && m.Force != nil {
return *m.Force
}
return false
}
var E_DeleteNodeCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*DeleteNodeCommand)(nil),
@ -640,7 +650,7 @@ var E_DeleteNodeCommand_Command = &proto.ExtensionDesc{
}
type CreateDatabaseCommand struct {
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -664,7 +674,7 @@ var E_CreateDatabaseCommand_Command = &proto.ExtensionDesc{
}
type DropDatabaseCommand struct {
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -688,8 +698,8 @@ var E_DropDatabaseCommand_Command = &proto.ExtensionDesc{
}
type CreateRetentionPolicyCommand struct {
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
RetentionPolicy *RetentionPolicyInfo `protobuf:"bytes,2,req" json:"RetentionPolicy,omitempty"`
Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"`
RetentionPolicy *RetentionPolicyInfo `protobuf:"bytes,2,req,name=RetentionPolicy" json:"RetentionPolicy,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -720,8 +730,8 @@ var E_CreateRetentionPolicyCommand_Command = &proto.ExtensionDesc{
}
type DropRetentionPolicyCommand struct {
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"`
Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -752,8 +762,8 @@ var E_DropRetentionPolicyCommand_Command = &proto.ExtensionDesc{
}
type SetDefaultRetentionPolicyCommand struct {
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"`
Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -784,11 +794,11 @@ var E_SetDefaultRetentionPolicyCommand_Command = &proto.ExtensionDesc{
}
type UpdateRetentionPolicyCommand struct {
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"`
NewName *string `protobuf:"bytes,3,opt" json:"NewName,omitempty"`
Duration *int64 `protobuf:"varint,4,opt" json:"Duration,omitempty"`
ReplicaN *uint32 `protobuf:"varint,5,opt" json:"ReplicaN,omitempty"`
Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"`
NewName *string `protobuf:"bytes,3,opt,name=NewName" json:"NewName,omitempty"`
Duration *int64 `protobuf:"varint,4,opt,name=Duration" json:"Duration,omitempty"`
ReplicaN *uint32 `protobuf:"varint,5,opt,name=ReplicaN" json:"ReplicaN,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -840,9 +850,9 @@ var E_UpdateRetentionPolicyCommand_Command = &proto.ExtensionDesc{
}
type CreateShardGroupCommand struct {
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Policy *string `protobuf:"bytes,2,req" json:"Policy,omitempty"`
Timestamp *int64 `protobuf:"varint,3,req" json:"Timestamp,omitempty"`
Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"`
Policy *string `protobuf:"bytes,2,req,name=Policy" json:"Policy,omitempty"`
Timestamp *int64 `protobuf:"varint,3,req,name=Timestamp" json:"Timestamp,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -880,9 +890,9 @@ var E_CreateShardGroupCommand_Command = &proto.ExtensionDesc{
}
type DeleteShardGroupCommand struct {
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Policy *string `protobuf:"bytes,2,req" json:"Policy,omitempty"`
ShardGroupID *uint64 `protobuf:"varint,3,req" json:"ShardGroupID,omitempty"`
Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"`
Policy *string `protobuf:"bytes,2,req,name=Policy" json:"Policy,omitempty"`
ShardGroupID *uint64 `protobuf:"varint,3,req,name=ShardGroupID" json:"ShardGroupID,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -920,9 +930,9 @@ var E_DeleteShardGroupCommand_Command = &proto.ExtensionDesc{
}
type CreateContinuousQueryCommand struct {
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"`
Query *string `protobuf:"bytes,3,req" json:"Query,omitempty"`
Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"`
Query *string `protobuf:"bytes,3,req,name=Query" json:"Query,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -960,8 +970,8 @@ var E_CreateContinuousQueryCommand_Command = &proto.ExtensionDesc{
}
type DropContinuousQueryCommand struct {
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"`
Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -992,9 +1002,9 @@ var E_DropContinuousQueryCommand_Command = &proto.ExtensionDesc{
}
type CreateUserCommand struct {
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Hash *string `protobuf:"bytes,2,req" json:"Hash,omitempty"`
Admin *bool `protobuf:"varint,3,req" json:"Admin,omitempty"`
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Hash *string `protobuf:"bytes,2,req,name=Hash" json:"Hash,omitempty"`
Admin *bool `protobuf:"varint,3,req,name=Admin" json:"Admin,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1032,7 +1042,7 @@ var E_CreateUserCommand_Command = &proto.ExtensionDesc{
}
type DropUserCommand struct {
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1056,8 +1066,8 @@ var E_DropUserCommand_Command = &proto.ExtensionDesc{
}
type UpdateUserCommand struct {
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Hash *string `protobuf:"bytes,2,req" json:"Hash,omitempty"`
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Hash *string `protobuf:"bytes,2,req,name=Hash" json:"Hash,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1088,9 +1098,9 @@ var E_UpdateUserCommand_Command = &proto.ExtensionDesc{
}
type SetPrivilegeCommand struct {
Username *string `protobuf:"bytes,1,req" json:"Username,omitempty"`
Database *string `protobuf:"bytes,2,req" json:"Database,omitempty"`
Privilege *int32 `protobuf:"varint,3,req" json:"Privilege,omitempty"`
Username *string `protobuf:"bytes,1,req,name=Username" json:"Username,omitempty"`
Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"`
Privilege *int32 `protobuf:"varint,3,req,name=Privilege" json:"Privilege,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1128,7 +1138,7 @@ var E_SetPrivilegeCommand_Command = &proto.ExtensionDesc{
}
type SetDataCommand struct {
Data *Data `protobuf:"bytes,1,req" json:"Data,omitempty"`
Data *Data `protobuf:"bytes,1,req,name=Data" json:"Data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1152,8 +1162,8 @@ var E_SetDataCommand_Command = &proto.ExtensionDesc{
}
type SetAdminPrivilegeCommand struct {
Username *string `protobuf:"bytes,1,req" json:"Username,omitempty"`
Admin *bool `protobuf:"varint,2,req" json:"Admin,omitempty"`
Username *string `protobuf:"bytes,1,req,name=Username" json:"Username,omitempty"`
Admin *bool `protobuf:"varint,2,req,name=Admin" json:"Admin,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1184,8 +1194,8 @@ var E_SetAdminPrivilegeCommand_Command = &proto.ExtensionDesc{
}
type UpdateNodeCommand struct {
ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"`
Host *string `protobuf:"bytes,2,req" json:"Host,omitempty"`
ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"`
Host *string `protobuf:"bytes,2,req,name=Host" json:"Host,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1216,9 +1226,9 @@ var E_UpdateNodeCommand_Command = &proto.ExtensionDesc{
}
type Response struct {
OK *bool `protobuf:"varint,1,req" json:"OK,omitempty"`
Error *string `protobuf:"bytes,2,opt" json:"Error,omitempty"`
Index *uint64 `protobuf:"varint,3,opt" json:"Index,omitempty"`
OK *bool `protobuf:"varint,1,req,name=OK" json:"OK,omitempty"`
Error *string `protobuf:"bytes,2,opt,name=Error" json:"Error,omitempty"`
Index *uint64 `protobuf:"varint,3,opt,name=Index" json:"Index,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1248,8 +1258,8 @@ func (m *Response) GetIndex() uint64 {
}
type ResponseHeader struct {
OK *bool `protobuf:"varint,1,req" json:"OK,omitempty"`
Error *string `protobuf:"bytes,2,opt" json:"Error,omitempty"`
OK *bool `protobuf:"varint,1,req,name=OK" json:"OK,omitempty"`
Error *string `protobuf:"bytes,2,opt,name=Error" json:"Error,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1272,7 +1282,7 @@ func (m *ResponseHeader) GetError() string {
}
type ErrorResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,req" json:"Header,omitempty"`
Header *ResponseHeader `protobuf:"bytes,1,req,name=Header" json:"Header,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1288,9 +1298,9 @@ func (m *ErrorResponse) GetHeader() *ResponseHeader {
}
type FetchDataRequest struct {
Index *uint64 `protobuf:"varint,1,req" json:"Index,omitempty"`
Term *uint64 `protobuf:"varint,2,req" json:"Term,omitempty"`
Blocking *bool `protobuf:"varint,3,opt,def=0" json:"Blocking,omitempty"`
Index *uint64 `protobuf:"varint,1,req,name=Index" json:"Index,omitempty"`
Term *uint64 `protobuf:"varint,2,req,name=Term" json:"Term,omitempty"`
Blocking *bool `protobuf:"varint,3,opt,name=Blocking,def=0" json:"Blocking,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1322,10 +1332,10 @@ func (m *FetchDataRequest) GetBlocking() bool {
}
type FetchDataResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,req" json:"Header,omitempty"`
Index *uint64 `protobuf:"varint,2,req" json:"Index,omitempty"`
Term *uint64 `protobuf:"varint,3,req" json:"Term,omitempty"`
Data []byte `protobuf:"bytes,4,opt" json:"Data,omitempty"`
Header *ResponseHeader `protobuf:"bytes,1,req,name=Header" json:"Header,omitempty"`
Index *uint64 `protobuf:"varint,2,req,name=Index" json:"Index,omitempty"`
Term *uint64 `protobuf:"varint,3,req,name=Term" json:"Term,omitempty"`
Data []byte `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1362,7 +1372,7 @@ func (m *FetchDataResponse) GetData() []byte {
}
type JoinRequest struct {
Addr *string `protobuf:"bytes,1,req" json:"Addr,omitempty"`
Addr *string `protobuf:"bytes,1,req,name=Addr" json:"Addr,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1378,14 +1388,14 @@ func (m *JoinRequest) GetAddr() string {
}
type JoinResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,req" json:"Header,omitempty"`
Header *ResponseHeader `protobuf:"bytes,1,req,name=Header" json:"Header,omitempty"`
// Indicates that this node should take part in the raft cluster.
EnableRaft *bool `protobuf:"varint,2,opt" json:"EnableRaft,omitempty"`
EnableRaft *bool `protobuf:"varint,2,opt,name=EnableRaft" json:"EnableRaft,omitempty"`
// The addresses of raft peers to use if joining as a raft member. If not joining
// as a raft member, these are the nodes running raft.
RaftNodes []string `protobuf:"bytes,3,rep" json:"RaftNodes,omitempty"`
RaftNodes []string `protobuf:"bytes,3,rep,name=RaftNodes" json:"RaftNodes,omitempty"`
// The node ID assigned to the requesting node.
NodeID *uint64 `protobuf:"varint,4,opt" json:"NodeID,omitempty"`
NodeID *uint64 `protobuf:"varint,4,opt,name=NodeID" json:"NodeID,omitempty"`
XXX_unrecognized []byte `json:"-"`
}

View File

@ -123,6 +123,7 @@ message DeleteNodeCommand {
optional DeleteNodeCommand command = 102;
}
required uint64 ID = 1;
required bool Force = 2;
}
message CreateDatabaseCommand {

View File

@ -13,9 +13,12 @@ import (
// StatementExecutor translates InfluxQL queries to meta store methods.
type StatementExecutor struct {
Store interface {
Node(id uint64) (ni *NodeInfo, err error)
Nodes() ([]NodeInfo, error)
Peers() ([]string, error)
Leader() string
DeleteNode(nodeID uint64, force bool) error
Database(name string) (*DatabaseInfo, error)
Databases() ([]DatabaseInfo, error)
CreateDatabase(name string) (*DatabaseInfo, error)
@ -88,6 +91,8 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement) *influxql.
return e.executeShowShardsStatement(stmt)
case *influxql.ShowStatsStatement:
return e.executeShowStatsStatement(stmt)
case *influxql.DropServerStatement:
return e.executeDropServerStatement(stmt)
default:
panic(fmt.Sprintf("unsupported statement type: %T", stmt))
}
@ -142,13 +147,33 @@ func (e *StatementExecutor) executeShowServersStatement(q *influxql.ShowServersS
return &influxql.Result{Err: err}
}
row := &models.Row{Columns: []string{"id", "cluster_addr", "raft"}}
leader := e.Store.Leader()
row := &models.Row{Columns: []string{"id", "cluster_addr", "raft", "raft-leader"}}
for _, ni := range nis {
row.Values = append(row.Values, []interface{}{ni.ID, ni.Host, contains(peers, ni.Host)})
row.Values = append(row.Values, []interface{}{ni.ID, ni.Host, contains(peers, ni.Host), leader == ni.Host})
}
return &influxql.Result{Series: []*models.Row{row}}
}
func (e *StatementExecutor) executeDropServerStatement(q *influxql.DropServerStatement) *influxql.Result {
// Dropping only non-Raft nodes supported.
peers, err := e.Store.Peers()
if err != nil {
return &influxql.Result{Err: err}
}
ni, err := e.Store.Node(q.NodeID)
if err != nil {
return &influxql.Result{Err: err}
}
if contains(peers, ni.Host) {
return &influxql.Result{Err: ErrNodeRaft}
}
err = e.Store.DeleteNode(q.NodeID, q.Force)
return &influxql.Result{Err: err}
}
func (e *StatementExecutor) executeCreateUserStatement(q *influxql.CreateUserStatement) *influxql.Result {
_, err := e.Store.CreateUser(q.Name, q.Password, q.Admin)
return &influxql.Result{Err: err}

View File

@ -125,15 +125,18 @@ func TestStatementExecutor_ExecuteStatement_ShowServers(t *testing.T) {
e.Store.PeersFn = func() ([]string, error) {
return []string{"node0"}, nil
}
e.Store.LeaderFn = func() string {
return "node0"
}
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW SERVERS`)); res.Err != nil {
t.Fatal(res.Err)
} else if !reflect.DeepEqual(res.Series, models.Rows{
{
Columns: []string{"id", "cluster_addr", "raft"},
Columns: []string{"id", "cluster_addr", "raft", "raft-leader"},
Values: [][]interface{}{
{uint64(1), "node0", true},
{uint64(2), "node1", false},
{uint64(1), "node0", true, true},
{uint64(2), "node1", false, false},
},
},
}) {
@ -141,6 +144,35 @@ func TestStatementExecutor_ExecuteStatement_ShowServers(t *testing.T) {
}
}
// Ensure a DROP SERVER statement can be executed.
func TestStatementExecutor_ExecuteStatement_DropServer(t *testing.T) {
e := NewStatementExecutor()
e.Store.NodeFn = func(id uint64) (*meta.NodeInfo, error) {
return &meta.NodeInfo{
ID: 1, Host: "node1",
}, nil
}
// Ensure Raft nodes cannot be dropped.
e.Store.PeersFn = func() ([]string, error) {
return []string{"node1"}, nil
}
if res := e.ExecuteStatement(influxql.MustParseStatement(`DROP SERVER 1`)); res.Err != meta.ErrNodeRaft {
t.Fatalf("unexpected error: %s", res.Err)
}
// Ensure non-Raft nodes can be dropped.
e.Store.PeersFn = func() ([]string, error) {
return []string{"node2"}, nil
}
e.Store.DeleteNodeFn = func(id uint64, force bool) error {
return nil
}
if res := e.ExecuteStatement(influxql.MustParseStatement(`DROP SERVER 1`)); res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
}
}
// Ensure a SHOW SERVERS statement returns errors from the store.
func TestStatementExecutor_ExecuteStatement_ShowServers_Err(t *testing.T) {
e := NewStatementExecutor()
@ -832,12 +864,15 @@ func NewStatementExecutor() *StatementExecutor {
// StatementExecutorStore represents a mock implementation of StatementExecutor.Store.
type StatementExecutorStore struct {
NodeFn func(id uint64) (*meta.NodeInfo, error)
NodesFn func() ([]meta.NodeInfo, error)
PeersFn func() ([]string, error)
LeaderFn func() string
DatabaseFn func(name string) (*meta.DatabaseInfo, error)
DatabasesFn func() ([]meta.DatabaseInfo, error)
CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error)
DropDatabaseFn func(name string) error
DeleteNodeFn func(nodeID uint64, force bool) error
DefaultRetentionPolicyFn func(database string) (*meta.RetentionPolicyInfo, error)
CreateRetentionPolicyFn func(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
UpdateRetentionPolicyFn func(database, name string, rpu *meta.RetentionPolicyUpdate) error
@ -856,6 +891,10 @@ type StatementExecutorStore struct {
DropContinuousQueryFn func(database, name string) error
}
func (s *StatementExecutorStore) Node(id uint64) (*meta.NodeInfo, error) {
return s.NodeFn(id)
}
func (s *StatementExecutorStore) Nodes() ([]meta.NodeInfo, error) {
return s.NodesFn()
}
@ -864,6 +903,17 @@ func (s *StatementExecutorStore) Peers() ([]string, error) {
return s.PeersFn()
}
func (s *StatementExecutorStore) Leader() string {
if s.LeaderFn != nil {
return s.LeaderFn()
}
return ""
}
func (s *StatementExecutorStore) DeleteNode(nodeID uint64, force bool) error {
return s.DeleteNodeFn(nodeID, force)
}
func (s *StatementExecutorStore) Database(name string) (*meta.DatabaseInfo, error) {
return s.DatabaseFn(name)
}

View File

@ -823,10 +823,16 @@ func (s *Store) UpdateNode(id uint64, host string) (*NodeInfo, error) {
}
// DeleteNode removes a node from the metastore by id.
func (s *Store) DeleteNode(id uint64) error {
func (s *Store) DeleteNode(id uint64, force bool) error {
ni := s.data.Node(id)
if ni == nil {
return ErrNodeNotFound
}
return s.exec(internal.Command_DeleteNodeCommand, internal.E_DeleteNodeCommand_Command,
&internal.DeleteNodeCommand{
ID: proto.Uint64(id),
ID: proto.Uint64(id),
Force: proto.Bool(force),
},
)
}
@ -1706,11 +1712,14 @@ func (fsm *storeFSM) applyDeleteNodeCommand(cmd *internal.Command) interface{} {
// Copy data and update.
other := fsm.data.Clone()
if err := other.DeleteNode(v.GetID()); err != nil {
if err := other.DeleteNode(v.GetID(), v.GetForce()); err != nil {
return err
}
fsm.data = other
id := v.GetID()
fsm.Logger.Printf("node '%d' removed", id)
return nil
}

View File

@ -151,7 +151,7 @@ func TestStore_DeleteNode(t *testing.T) {
}
// Remove second node.
if err := s.DeleteNode(3); err != nil {
if err := s.DeleteNode(3, false); err != nil {
t.Fatal(err)
}
@ -173,7 +173,7 @@ func TestStore_DeleteNode_ErrNodeNotFound(t *testing.T) {
s := MustOpenStore()
defer s.Close()
if err := s.DeleteNode(2); err != meta.ErrNodeNotFound {
if err := s.DeleteNode(2, false); err != meta.ErrNodeNotFound {
t.Fatalf("unexpected error: %s", err)
}
}

View File

@ -15,14 +15,16 @@ It has these top-level messages:
package internal
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type Request struct {
ShardID *uint64 `protobuf:"varint,1,req" json:"ShardID,omitempty"`
ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -38,7 +40,7 @@ func (m *Request) GetShardID() uint64 {
}
type Response struct {
Error *string `protobuf:"bytes,1,opt" json:"Error,omitempty"`
Error *string `protobuf:"bytes,1,opt,name=Error" json:"Error,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -52,6 +54,3 @@ func (m *Response) GetError() string {
}
return ""
}
func init() {
}

View File

@ -17,15 +17,17 @@ It has these top-level messages:
package internal
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type Series struct {
Key *string `protobuf:"bytes,1,req" json:"Key,omitempty"`
Tags []*Tag `protobuf:"bytes,2,rep" json:"Tags,omitempty"`
Key *string `protobuf:"bytes,1,req,name=Key" json:"Key,omitempty"`
Tags []*Tag `protobuf:"bytes,2,rep,name=Tags" json:"Tags,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -48,8 +50,8 @@ func (m *Series) GetTags() []*Tag {
}
type Tag struct {
Key *string `protobuf:"bytes,1,req" json:"Key,omitempty"`
Value *string `protobuf:"bytes,2,req" json:"Value,omitempty"`
Key *string `protobuf:"bytes,1,req,name=Key" json:"Key,omitempty"`
Value *string `protobuf:"bytes,2,req,name=Value" json:"Value,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -72,7 +74,7 @@ func (m *Tag) GetValue() string {
}
type MeasurementFields struct {
Fields []*Field `protobuf:"bytes,1,rep" json:"Fields,omitempty"`
Fields []*Field `protobuf:"bytes,1,rep,name=Fields" json:"Fields,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -88,9 +90,9 @@ func (m *MeasurementFields) GetFields() []*Field {
}
type Field struct {
ID *int32 `protobuf:"varint,1,req" json:"ID,omitempty"`
Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"`
Type *int32 `protobuf:"varint,3,req" json:"Type,omitempty"`
ID *int32 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"`
Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"`
Type *int32 `protobuf:"varint,3,req,name=Type" json:"Type,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -118,6 +120,3 @@ func (m *Field) GetType() int32 {
}
return 0
}
func init() {
}