Implement meta.Store and meta.Data.

pull/2654/head
Ben Johnson 2015-05-21 14:06:01 -06:00
parent f861159e8e
commit 7258a9be3c
7 changed files with 2364 additions and 597 deletions

View File

@ -1,7 +1,6 @@
package meta
import (
"errors"
"time"
"github.com/gogo/protobuf/proto"
@ -11,35 +10,27 @@ import (
//go:generate protoc --gogo_out=. internal/meta.proto
var (
// ErrNodeExists is returned when creating a node that already exists.
ErrNodeExists = errors.New("node already exists")
)
// Data represents the top level collection of all metadata.
type Data struct {
Version uint64 // autoincrementing version
MaxNodeID uint64
Nodes []NodeInfo
Databases []DatabaseInfo
Users []UserInfo
Version uint64 // autoincrementing version
Nodes []NodeInfo
Databases []DatabaseInfo
Users []UserInfo
ContinuousQueries []ContinuousQueryInfo
MaxNodeID uint64
MaxShardGroupID uint64
MaxShardID uint64
}
// CreateNode returns a new instance of Data with a new node.
func (data *Data) CreateNode(host string) (*Data, error) {
if data.NodeByHost(host) != nil {
return nil, ErrNodeExists
// Node returns a node by id.
func (data *Data) Node(id uint64) *NodeInfo {
for i := range data.Nodes {
if data.Nodes[i].ID == id {
return &data.Nodes[i]
}
}
// Clone and append new node.
other := data.Clone()
other.MaxNodeID++
other.Nodes = append(other.Nodes, NodeInfo{
ID: other.MaxNodeID,
Host: host,
})
return other, nil
return nil
}
// NodeByHost returns a node by hostname.
@ -52,29 +43,395 @@ func (data *Data) NodeByHost(host string) *NodeInfo {
return nil
}
// CreateNode adds a node to the metadata.
func (data *Data) CreateNode(host string) error {
// Ensure a node with the same host doesn't already exist.
if data.NodeByHost(host) != nil {
return ErrNodeExists
}
// Append new node.
data.MaxNodeID++
data.Nodes = append(data.Nodes, NodeInfo{
ID: data.MaxNodeID,
Host: host,
})
return nil
}
// DeleteNode removes a node from the metadata.
func (data *Data) DeleteNode(id uint64) error {
for i := range data.Nodes {
if data.Nodes[i].ID == id {
data.Nodes = append(data.Nodes[:i], data.Nodes[i+1:]...)
return nil
}
}
return ErrNodeNotFound
}
// Database returns a database by name.
func (data *Data) Database(name string) *DatabaseInfo {
for i := range data.Databases {
if data.Databases[i].Name == name {
return &data.Databases[i]
}
}
return nil
}
// CreateDatabase creates a new database.
// Returns an error if name is blank or if a database with the same name already exists.
func (data *Data) CreateDatabase(name string) error {
if name == "" {
return ErrDatabaseNameRequired
} else if data.Database(name) != nil {
return ErrDatabaseExists
}
// Append new node.
data.Databases = append(data.Databases, DatabaseInfo{Name: name})
return nil
}
// DropDatabase removes a database by name.
func (data *Data) DropDatabase(name string) error {
for i := range data.Databases {
if data.Databases[i].Name == name {
data.Databases = append(data.Databases[:i], data.Databases[i+1:]...)
return nil
}
}
return ErrDatabaseNotFound
}
// RetentionPolicy returns a retention policy for a database by name.
func (data *Data) RetentionPolicy(database, name string) (*RetentionPolicyInfo, error) {
di := data.Database(database)
if di == nil {
return nil, ErrDatabaseNotFound
}
for i := range di.RetentionPolicies {
if di.RetentionPolicies[i].Name == name {
return &di.RetentionPolicies[i], nil
}
}
return nil, nil
}
// CreateRetentionPolicy creates a new retention policy on a database.
// Returns an error if name is blank or if a database does not exist.
func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo) error {
// Validate retention policy.
if rpi.Name == "" {
return ErrRetentionPolicyNameRequired
}
// Find database.
di := data.Database(database)
if di == nil {
return ErrDatabaseNotFound
} else if di.RetentionPolicy(rpi.Name) != nil {
return ErrRetentionPolicyExists
}
// Append new policy.
di.RetentionPolicies = append(di.RetentionPolicies, RetentionPolicyInfo{
Name: rpi.Name,
Duration: rpi.Duration,
ShardGroupDuration: shardGroupDuration(rpi.Duration),
ReplicaN: rpi.ReplicaN,
})
return nil
}
// DropRetentionPolicy removes a retention policy from a database by name.
func (data *Data) DropRetentionPolicy(database, name string) error {
// Find database.
di := data.Database(database)
if di == nil {
return ErrDatabaseNotFound
}
// Remove from list.
for i := range di.RetentionPolicies {
if di.RetentionPolicies[i].Name == name {
di.RetentionPolicies = append(di.RetentionPolicies[:i], di.RetentionPolicies[i+1:]...)
return nil
}
}
return ErrRetentionPolicyNotFound
}
// UpdateRetentionPolicy updates an existing retention policy.
func (data *Data) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error {
// Find database.
di := data.Database(database)
if di == nil {
return ErrDatabaseNotFound
}
// Find policy.
rpi := di.RetentionPolicy(name)
if rpi == nil {
return ErrRetentionPolicyNotFound
}
// Ensure new policy doesn't match an existing policy.
if rpu.Name != nil && *rpu.Name != name && di.RetentionPolicy(*rpu.Name) != nil {
return ErrRetentionPolicyNameExists
}
// Update fields.
if rpu.Name != nil {
rpi.Name = *rpu.Name
}
if rpu.Duration != nil {
rpi.Duration = *rpu.Duration
}
if rpu.ReplicaN != nil {
rpi.ReplicaN = *rpu.ReplicaN
}
return nil
}
// SetDefaultRetentionPolicy sets the default retention policy for a database.
func (data *Data) SetDefaultRetentionPolicy(database, name string) error {
// Find database and verify policy exists.
di := data.Database(database)
if di == nil {
return ErrDatabaseNotFound
} else if di.RetentionPolicy(name) == nil {
return ErrRetentionPolicyNotFound
}
// Set default policy.
di.DefaultRetentionPolicy = name
return nil
}
// ShardGroupByTimestamp returns the shard group on a database and policy for a given timestamp.
func (data *Data) ShardGroupByTimestamp(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
// Find retention policy.
rpi, err := data.RetentionPolicy(database, policy)
if err != nil {
return nil, err
} else if rpi == nil {
return nil, ErrRetentionPolicyNotFound
}
return rpi.ShardGroupByTimestamp(timestamp), nil
}
// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) error {
// Ensure there are nodes in the metadata.
if len(data.Nodes) == 0 {
return ErrNodesRequired
}
// Find retention policy.
rpi, err := data.RetentionPolicy(database, policy)
if err != nil {
return err
} else if rpi == nil {
return ErrRetentionPolicyNotFound
}
// Verify that shard group doesn't already exist for this timestamp.
if rpi.ShardGroupByTimestamp(timestamp) != nil {
return ErrShardGroupExists
}
// Require at least one replica but no more replicas than nodes.
replicaN := rpi.ReplicaN
if replicaN == 0 {
replicaN = 1
} else if replicaN > len(data.Nodes) {
replicaN = len(data.Nodes)
}
// Determine shard count by node count divided by replication factor.
// This will ensure nodes will get distributed across nodes evenly and
// replicated the correct number of times.
shardN := len(data.Nodes) / replicaN
// Create the shard group.
data.MaxShardGroupID++
sgi := ShardGroupInfo{}
sgi.ID = data.MaxShardGroupID
sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC()
sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()
// Create shards on the group.
sgi.Shards = make([]ShardInfo, shardN)
for i := range sgi.Shards {
data.MaxShardID++
sgi.Shards[i] = ShardInfo{ID: data.MaxShardID}
}
// Assign data nodes to shards via round robin.
// Start from a repeatably "random" place in the node list.
nodeIndex := int(data.Version % uint64(len(data.Nodes)))
for i := range sgi.Shards {
si := &sgi.Shards[i]
for j := 0; j < replicaN; j++ {
nodeID := data.Nodes[nodeIndex%len(data.Nodes)].ID
si.OwnerIDs = append(si.OwnerIDs, nodeID)
nodeIndex++
}
}
// Retention policy has a new shard group, so update the policy.
rpi.ShardGroups = append(rpi.ShardGroups, sgi)
return nil
}
// DeleteShardGroup removes a shard group from a database and retention policy by id.
func (data *Data) DeleteShardGroup(database, policy string, id uint64) error {
// Find retention policy.
rpi, err := data.RetentionPolicy(database, policy)
if err != nil {
return err
} else if rpi == nil {
return ErrRetentionPolicyNotFound
}
// Find shard group by ID and remove it.
for i := range rpi.ShardGroups {
if rpi.ShardGroups[i].ID == id {
rpi.ShardGroups = append(rpi.ShardGroups[:i], rpi.ShardGroups[i+1:]...)
return nil
}
}
return ErrShardGroupNotFound
}
// CreateContinuousQuery adds a continuous query.
func (data *Data) CreateContinuousQuery(query string) error {
// Ensure the query doesn't already exist.
for i := range data.ContinuousQueries {
if data.ContinuousQueries[i].Query == query {
return ErrContinuousQueryExists
}
}
// Append new query.
data.ContinuousQueries = append(data.ContinuousQueries, ContinuousQueryInfo{
Query: query,
})
return nil
}
// DropContinuousQuery removes a continuous query.
func (data *Data) DropContinuousQuery(query string) error {
for i := range data.ContinuousQueries {
if data.ContinuousQueries[i].Query == query {
data.ContinuousQueries = append(data.ContinuousQueries[:i], data.ContinuousQueries[i+1:]...)
return nil
}
}
return ErrContinuousQueryNotFound
}
// User returns a user by username.
func (data *Data) User(username string) *UserInfo {
for i := range data.Users {
if data.Users[i].Name == username {
return &data.Users[i]
}
}
return nil
}
// CreateUser creates a new user.
func (data *Data) CreateUser(name, hash string, admin bool) error {
// Ensure the user doesn't already exist.
if name == "" {
return ErrUsernameRequired
} else if data.User(name) != nil {
return ErrUserExists
}
// Append new user.
data.Users = append(data.Users, UserInfo{
Name: name,
Hash: hash,
Admin: admin,
})
return nil
}
// DropUser removes an existing user by name.
func (data *Data) DropUser(name string) error {
for i := range data.Users {
if data.Users[i].Name == name {
data.Users = append(data.Users[:i], data.Users[i+1:]...)
return nil
}
}
return ErrUserNotFound
}
// UpdateUser updates the password hash of an existing user.
func (data *Data) UpdateUser(name, hash string) error {
for i := range data.Users {
if data.Users[i].Name == name {
data.Users[i].Hash = hash
return nil
}
}
return ErrUserNotFound
}
// Clone returns a copy of data with a new version.
func (data *Data) Clone() *Data {
other := &Data{Version: data.Version + 1}
other := *data
other.Version++
// Copy nodes.
other.Nodes = make([]NodeInfo, len(data.Nodes))
for i := range data.Nodes {
other.Nodes[i] = data.Nodes[i].Clone()
if data.Nodes != nil {
other.Nodes = make([]NodeInfo, len(data.Nodes))
for i := range data.Nodes {
other.Nodes[i] = data.Nodes[i].clone()
}
}
// Deep copy databases.
other.Databases = make([]DatabaseInfo, len(data.Databases))
for i := range data.Databases {
other.Databases[i] = data.Databases[i].Clone()
if data.Databases != nil {
other.Databases = make([]DatabaseInfo, len(data.Databases))
for i := range data.Databases {
other.Databases[i] = data.Databases[i].clone()
}
}
// Copy continuous queries.
if data.ContinuousQueries != nil {
other.ContinuousQueries = make([]ContinuousQueryInfo, len(data.ContinuousQueries))
for i := range data.ContinuousQueries {
other.ContinuousQueries[i] = data.ContinuousQueries[i].clone()
}
}
// Copy users.
other.Users = make([]UserInfo, len(data.Users))
for i := range data.Users {
other.Users[i] = data.Users[i].Clone()
if data.Users != nil {
other.Users = make([]UserInfo, len(data.Users))
for i := range data.Users {
other.Users[i] = data.Users[i].clone()
}
}
return other
return &other
}
// NodeInfo represents information about a single node in the cluster.
@ -83,8 +440,8 @@ type NodeInfo struct {
Host string
}
// Clone returns a deep copy of ni.
func (ni NodeInfo) Clone() NodeInfo { return ni }
// clone returns a deep copy of ni.
func (ni NodeInfo) clone() NodeInfo { return ni }
// MarshalBinary encodes the object to a binary format.
func (info *NodeInfo) MarshalBinary() ([]byte, error) {
@ -109,32 +466,33 @@ func (info *NodeInfo) UnmarshalBinary(buf []byte) error {
type DatabaseInfo struct {
Name string
DefaultRetentionPolicy string
Policies []RetentionPolicyInfo
ContinuousQueries []ContinuousQueryInfo
RetentionPolicies []RetentionPolicyInfo
}
// Clone returns a deep copy of di.
func (di DatabaseInfo) Clone() DatabaseInfo {
// RetentionPolicy returns a retention policy by name.
func (di DatabaseInfo) RetentionPolicy(name string) *RetentionPolicyInfo {
for i := range di.RetentionPolicies {
if di.RetentionPolicies[i].Name == name {
return &di.RetentionPolicies[i]
}
}
return nil
}
// clone returns a deep copy of di.
func (di DatabaseInfo) clone() DatabaseInfo {
other := di
other.Policies = make([]RetentionPolicyInfo, len(di.Policies))
for i := range di.Policies {
other.Policies[i] = di.Policies[i].Clone()
}
other.ContinuousQueries = make([]ContinuousQueryInfo, len(di.ContinuousQueries))
for i := range di.ContinuousQueries {
other.ContinuousQueries[i] = di.ContinuousQueries[i].Clone()
if di.RetentionPolicies != nil {
other.RetentionPolicies = make([]RetentionPolicyInfo, len(di.RetentionPolicies))
for i := range di.RetentionPolicies {
other.RetentionPolicies[i] = di.RetentionPolicies[i].clone()
}
}
return other
}
// RetentionPolicy returns a policy on the database by name.
func (db *DatabaseInfo) RetentionPolicy(name string) *RetentionPolicyInfo {
panic("not yet implemented")
}
// RetentionPolicyInfo represents metadata about a retention policy.
type RetentionPolicyInfo struct {
Name string
@ -144,18 +502,50 @@ type RetentionPolicyInfo struct {
ShardGroups []ShardGroupInfo
}
// Clone returns a deep copy of rpi.
func (rpi RetentionPolicyInfo) Clone() RetentionPolicyInfo {
// ShardGroupByTimestamp returns the shard group in the policy that contains the timestamp.
func (rpi *RetentionPolicyInfo) ShardGroupByTimestamp(timestamp time.Time) *ShardGroupInfo {
for i := range rpi.ShardGroups {
if rpi.ShardGroups[i].Contains(timestamp) {
return &rpi.ShardGroups[i]
}
}
return nil
}
// protobuf returns a protocol buffers object.
func (rpi *RetentionPolicyInfo) protobuf() *internal.RetentionPolicyInfo {
return &internal.RetentionPolicyInfo{
Name: proto.String(rpi.Name),
ReplicaN: proto.Uint32(uint32(rpi.ReplicaN)),
Duration: proto.Int64(int64(rpi.Duration)),
ShardGroupDuration: proto.Int64(int64(rpi.ShardGroupDuration)),
}
}
// clone returns a deep copy of rpi.
func (rpi RetentionPolicyInfo) clone() RetentionPolicyInfo {
other := rpi
other.ShardGroups = make([]ShardGroupInfo, len(rpi.ShardGroups))
for i := range rpi.ShardGroups {
other.ShardGroups[i] = rpi.ShardGroups[i].Clone()
if rpi.ShardGroups != nil {
other.ShardGroups = make([]ShardGroupInfo, len(rpi.ShardGroups))
for i := range rpi.ShardGroups {
other.ShardGroups[i] = rpi.ShardGroups[i].clone()
}
}
return other
}
// shardGroupDuration returns the duration for a shard group based on a policy duration.
func shardGroupDuration(d time.Duration) time.Duration {
if d >= 180*24*time.Hour || d == 0 { // 6 months or 0
return 7 * 24 * time.Hour
} else if d >= 2*24*time.Hour { // 2 days
return 1 * 24 * time.Hour
}
return 1 * time.Hour
}
// ShardGroupInfo represents metadata about a shard group.
type ShardGroupInfo struct {
ID uint64
@ -164,13 +554,20 @@ type ShardGroupInfo struct {
Shards []ShardInfo
}
// Clone returns a deep copy of sgi.
func (sgi ShardGroupInfo) Clone() ShardGroupInfo {
// Contains return true if the shard group contains data for the timestamp.
func (sgi *ShardGroupInfo) Contains(timestamp time.Time) bool {
return !sgi.StartTime.After(timestamp) && sgi.EndTime.After(timestamp)
}
// clone returns a deep copy of sgi.
func (sgi ShardGroupInfo) clone() ShardGroupInfo {
other := sgi
other.Shards = make([]ShardInfo, len(sgi.Shards))
for i := range sgi.Shards {
other.Shards[i] = sgi.Shards[i].Clone()
if sgi.Shards != nil {
other.Shards = make([]ShardInfo, len(sgi.Shards))
for i := range sgi.Shards {
other.Shards[i] = sgi.Shards[i].clone()
}
}
return other
@ -187,12 +584,14 @@ type ShardInfo struct {
OwnerIDs []uint64
}
// Clone returns a deep copy of si.
func (si ShardInfo) Clone() ShardInfo {
// clone returns a deep copy of si.
func (si ShardInfo) clone() ShardInfo {
other := si
other.OwnerIDs = make([]uint64, len(si.OwnerIDs))
copy(other.OwnerIDs, si.OwnerIDs)
if si.OwnerIDs != nil {
other.OwnerIDs = make([]uint64, len(si.OwnerIDs))
copy(other.OwnerIDs, si.OwnerIDs)
}
return other
}
@ -202,8 +601,8 @@ type ContinuousQueryInfo struct {
Query string
}
// Clone returns a deep copy of cqi.
func (cqi ContinuousQueryInfo) Clone() ContinuousQueryInfo { return cqi }
// clone returns a deep copy of cqi.
func (cqi ContinuousQueryInfo) clone() ContinuousQueryInfo { return cqi }
// UserInfo represents metadata about a user in the system.
type UserInfo struct {
@ -213,13 +612,15 @@ type UserInfo struct {
Privileges map[string]influxql.Privilege
}
// Clone returns a deep copy of si.
func (ui UserInfo) Clone() UserInfo {
// clone returns a deep copy of si.
func (ui UserInfo) clone() UserInfo {
other := ui
other.Privileges = make(map[string]influxql.Privilege)
for k, v := range ui.Privileges {
other.Privileges[k] = v
if ui.Privileges != nil {
other.Privileges = make(map[string]influxql.Privilege)
for k, v := range ui.Privileges {
other.Privileges[k] = v
}
}
return other

View File

@ -1,6 +1,9 @@
package meta_test
// import "github.com/davecgh/go-spew/spew"
import (
"fmt"
"reflect"
"testing"
"time"
@ -9,6 +12,421 @@ import (
"github.com/influxdb/influxdb/meta"
)
// Ensure a node can be created.
func TestData_CreateNode(t *testing.T) {
var data meta.Data
if err := data.CreateNode("host0"); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(data.Nodes, []meta.NodeInfo{{ID: 1, Host: "host0"}}) {
t.Fatalf("unexpected node: %#v", data.Nodes[0])
}
}
// Ensure a node can be removed.
func TestData_DeleteNode(t *testing.T) {
var data meta.Data
if err := data.CreateNode("host0"); err != nil {
t.Fatal(err)
} else if err = data.CreateNode("host1"); err != nil {
t.Fatal(err)
} else if err := data.CreateNode("host2"); err != nil {
t.Fatal(err)
}
if err := data.DeleteNode(1); err != nil {
t.Fatal(err)
} else if len(data.Nodes) != 2 {
t.Fatalf("unexpected node count: %d", len(data.Nodes))
} else if data.Nodes[0] != (meta.NodeInfo{ID: 2, Host: "host1"}) {
t.Fatalf("unexpected node: %#v", data.Nodes[0])
} else if data.Nodes[1] != (meta.NodeInfo{ID: 3, Host: "host2"}) {
t.Fatalf("unexpected node: %#v", data.Nodes[1])
}
}
// Ensure a database can be created.
func TestData_CreateDatabase(t *testing.T) {
var data meta.Data
if err := data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(data.Databases, []meta.DatabaseInfo{{Name: "db0"}}) {
t.Fatalf("unexpected databases: %#v", data.Databases)
}
}
// Ensure that creating a database without a name returns an error.
func TestData_CreateDatabase_ErrNameRequired(t *testing.T) {
var data meta.Data
if err := data.CreateDatabase(""); err != meta.ErrDatabaseNameRequired {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that creating an already existing database returns an error.
func TestData_CreateDatabase_ErrDatabaseExists(t *testing.T) {
var data meta.Data
if err := data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}
if err := data.CreateDatabase("db0"); err != meta.ErrDatabaseExists {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure a database can be removed.
func TestData_DropDatabase(t *testing.T) {
var data meta.Data
for i := 0; i < 3; i++ {
if err := data.CreateDatabase(fmt.Sprintf("db%d", i)); err != nil {
t.Fatal(err)
}
}
if err := data.DropDatabase("db1"); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(data.Databases, []meta.DatabaseInfo{{Name: "db0"}, {Name: "db2"}}) {
t.Fatalf("unexpected databases: %#v", data.Databases)
}
}
// Ensure a retention policy can be created.
func TestData_CreateRetentionPolicy(t *testing.T) {
var data meta.Data
if err := data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}
// Create policy.
if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{
Name: "rp0",
ReplicaN: 2,
Duration: 4 * time.Hour,
}); err != nil {
t.Fatal(err)
}
// Verify policy exists.
if !reflect.DeepEqual(data.Databases[0].RetentionPolicies, []meta.RetentionPolicyInfo{
{
Name: "rp0",
ReplicaN: 2,
Duration: 4 * time.Hour,
ShardGroupDuration: 1 * time.Hour,
},
}) {
t.Fatalf("unexpected policies: %#v", data.Databases[0].RetentionPolicies)
}
}
// Ensure that creating a policy without a name returns an error.
func TestData_CreateRetentionPolicy_ErrNameRequired(t *testing.T) {
var data meta.Data
if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: ""}); err != meta.ErrRetentionPolicyNameRequired {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that creating a retention policy on a non-existent database returns an error.
func TestData_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
var data meta.Data
if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != meta.ErrDatabaseNotFound {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that creating an already existing policy returns an error.
func TestData_CreateRetentionPolicy_ErrRetentionPolicyExists(t *testing.T) {
var data meta.Data
if err := data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil {
t.Fatal(err)
}
if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != meta.ErrRetentionPolicyExists {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that a retention policy can be updated.
func TestData_UpdateRetentionPolicy(t *testing.T) {
var data meta.Data
if err := data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil {
t.Fatal(err)
}
// Update the policy.
var rpu meta.RetentionPolicyUpdate
rpu.SetName("rp1")
rpu.SetDuration(10 * time.Hour)
rpu.SetReplicaN(3)
if err := data.UpdateRetentionPolicy("db0", "rp0", &rpu); err != nil {
t.Fatal(err)
}
// Verify the policy was changed.
if rpi, _ := data.RetentionPolicy("db0", "rp1"); !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{
Name: "rp1",
Duration: 10 * time.Hour,
ShardGroupDuration: 604800000000000,
ReplicaN: 3,
}) {
t.Fatalf("unexpected policy: %#v", rpi)
}
}
// Ensure a retention policy can be removed.
func TestData_DropRetentionPolicy(t *testing.T) {
var data meta.Data
if err := data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil {
t.Fatal(err)
}
if err := data.DropRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
} else if len(data.Databases[0].RetentionPolicies) != 0 {
t.Fatalf("unexpected policies: %#v", data.Databases[0].RetentionPolicies)
}
}
// Ensure an error is returned when deleting a policy from a non-existent database.
func TestData_DropRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
var data meta.Data
if err := data.DropRetentionPolicy("db0", "rp0"); err != meta.ErrDatabaseNotFound {
t.Fatal(err)
}
}
// Ensure an error is returned when deleting a non-existent policy.
func TestData_DropRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) {
var data meta.Data
if err := data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}
if err := data.DropRetentionPolicy("db0", "rp0"); err != meta.ErrRetentionPolicyNotFound {
t.Fatal(err)
}
}
// Ensure that a retention policy can be retrieved.
func TestData_RetentionPolicy(t *testing.T) {
var data meta.Data
if err := data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil {
t.Fatal(err)
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp1"}); err != nil {
t.Fatal(err)
}
if rpi, err := data.RetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{
Name: "rp0",
ShardGroupDuration: 604800000000000,
}) {
t.Fatalf("unexpected value: %#v", rpi)
}
}
// Ensure that retrieveing a policy from a non-existent database returns an error.
func TestData_RetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
var data meta.Data
if _, err := data.RetentionPolicy("db0", "rp0"); err != meta.ErrDatabaseNotFound {
t.Fatal(err)
}
}
// Ensure that a default retention policy can be set.
func TestData_SetDefaultRetentionPolicy(t *testing.T) {
var data meta.Data
if err := data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil {
t.Fatal(err)
}
// Verify there is no default policy on the database initially.
if name := data.Database("db0").DefaultRetentionPolicy; name != "" {
t.Fatalf("unexpected initial default retention policy: %s", name)
}
// Set the default policy.
if err := data.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}
// Verify the default policy is now set.
if name := data.Database("db0").DefaultRetentionPolicy; name != "rp0" {
t.Fatalf("unexpected default retention policy: %s", name)
}
}
// Ensure that a shard group can be created on a database for a given timestamp.
func TestData_CreateShardGroup(t *testing.T) {
var data meta.Data
if err := data.CreateNode("node0"); err != nil {
t.Fatal(err)
} else if err = data.CreateNode("node1"); err != nil {
t.Fatal(err)
} else if err = data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", Duration: 1 * time.Hour}); err != nil {
t.Fatal(err)
}
// Create shard group.
if err := data.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
}
// Verify the shard group was created.
if sgi, _ := data.ShardGroupByTimestamp("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); !reflect.DeepEqual(sgi, &meta.ShardGroupInfo{
ID: 1,
StartTime: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC),
EndTime: time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC),
Shards: []meta.ShardInfo{
{ID: 1, OwnerIDs: []uint64{1}},
{ID: 2, OwnerIDs: []uint64{2}},
},
}) {
t.Fatalf("unexpected shard group: %#v", sgi)
}
}
// Ensure a shard group can be removed by ID.
func TestData_DeleteShardGroup(t *testing.T) {
var data meta.Data
if err := data.CreateNode("node0"); err != nil {
t.Fatal(err)
} else if err := data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil {
t.Fatal(err)
} else if err := data.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
}
if err := data.DeleteShardGroup("db0", "rp0", 1); err != nil {
t.Fatal(err)
} else if len(data.Databases[0].RetentionPolicies[0].ShardGroups) != 0 {
t.Fatalf("unexpected shard groups: %#v", data.Databases[0].RetentionPolicies[0].ShardGroups)
}
}
// Ensure a continuous query can be created.
func TestData_CreateContinuousQuery(t *testing.T) {
var data meta.Data
if err := data.CreateContinuousQuery("SELECT count() FROM foo"); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(data.ContinuousQueries, []meta.ContinuousQueryInfo{{Query: "SELECT count() FROM foo"}}) {
t.Fatalf("unexpected queries: %#v", data.ContinuousQueries)
}
}
// Ensure a continuous query can be removed.
func TestData_DropContinuousQuery(t *testing.T) {
var data meta.Data
if err := data.CreateContinuousQuery("SELECT count() FROM foo"); err != nil {
t.Fatal(err)
} else if err = data.CreateContinuousQuery("SELECT count() FROM bar"); err != nil {
t.Fatal(err)
}
if err := data.DropContinuousQuery("SELECT count() FROM foo"); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(data.ContinuousQueries, []meta.ContinuousQueryInfo{
{Query: "SELECT count() FROM bar"},
}) {
t.Fatalf("unexpected queries: %#v", data.ContinuousQueries)
}
}
// Ensure a user can be created.
func TestData_CreateUser(t *testing.T) {
var data meta.Data
if err := data.CreateUser("susy", "ABC123", true); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(data.Users, []meta.UserInfo{
{Name: "susy", Hash: "ABC123", Admin: true},
}) {
t.Fatalf("unexpected users: %#v", data.Users)
}
}
// Ensure that creating a user with no username returns an error.
func TestData_CreateUser_ErrUsernameRequired(t *testing.T) {
var data meta.Data
if err := data.CreateUser("", "", false); err != meta.ErrUsernameRequired {
t.Fatal(err)
}
}
// Ensure that creating the same user twice returns an error.
func TestData_CreateUser_ErrUserExists(t *testing.T) {
var data meta.Data
if err := data.CreateUser("susy", "", false); err != nil {
t.Fatal(err)
}
if err := data.CreateUser("susy", "", false); err != meta.ErrUserExists {
t.Fatal(err)
}
}
// Ensure a user can be removed.
func TestData_DropUser(t *testing.T) {
var data meta.Data
if err := data.CreateUser("susy", "", false); err != nil {
t.Fatal(err)
} else if err := data.CreateUser("bob", "", false); err != nil {
t.Fatal(err)
}
if err := data.DropUser("bob"); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(data.Users, []meta.UserInfo{
{Name: "susy"},
}) {
t.Fatalf("unexpected users: %#v", data.Users)
}
}
// Ensure that removing a non-existent user returns an error.
func TestData_DropUser_ErrUserNotFound(t *testing.T) {
var data meta.Data
if err := data.DropUser("bob"); err != meta.ErrUserNotFound {
t.Fatal(err)
}
}
// Ensure a user can be updated.
func TestData_UpdateUser(t *testing.T) {
var data meta.Data
if err := data.CreateUser("susy", "", false); err != nil {
t.Fatal(err)
} else if err := data.CreateUser("bob", "", false); err != nil {
t.Fatal(err)
}
// Update password hash.
if err := data.UpdateUser("bob", "XXX"); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(data.User("bob"), &meta.UserInfo{Name: "bob", Hash: "XXX"}) {
t.Fatalf("unexpected user: %#v", data.User("bob"))
}
}
// Ensure that updating a non-existent user returns an error.
func TestData_UpdateUser_ErrUserNotFound(t *testing.T) {
var data meta.Data
if err := data.UpdateUser("bob", "ZZZ"); err != meta.ErrUserNotFound {
t.Fatal(err)
}
}
// Ensure the data can be deeply copied.
func TestData_Clone(t *testing.T) {
data := meta.Data{
@ -21,7 +439,7 @@ func TestData_Clone(t *testing.T) {
{
Name: "db0",
DefaultRetentionPolicy: "default",
Policies: []meta.RetentionPolicyInfo{
RetentionPolicies: []meta.RetentionPolicyInfo{
{
Name: "rp0",
ReplicaN: 3,
@ -42,9 +460,11 @@ func TestData_Clone(t *testing.T) {
},
},
},
ContinuousQueries: []meta.ContinuousQueryInfo{},
},
},
ContinuousQueries: []meta.ContinuousQueryInfo{
{Query: "SELECT count() FROM foo"},
},
Users: []meta.UserInfo{
{
Name: "susy",
@ -70,8 +490,8 @@ func TestData_Clone(t *testing.T) {
}
// Ensure that changing data in the clone does not affect the original.
other.Databases[0].Policies[0].ShardGroups[0].Shards[0].OwnerIDs[1] = 9
if v := data.Databases[0].Policies[0].ShardGroups[0].Shards[0].OwnerIDs[1]; v != 3 {
other.Databases[0].RetentionPolicies[0].ShardGroups[0].Shards[0].OwnerIDs[1] = 9
if v := data.Databases[0].RetentionPolicies[0].ShardGroups[0].Shards[0].OwnerIDs[1]; v != 3 {
t.Fatalf("editing clone changed original: %v", v)
}
}

100
meta/errors.go Normal file
View File

@ -0,0 +1,100 @@
package meta
import "errors"
var (
// ErrStoreOpen is returned when opening an already open store.
ErrStoreOpen = errors.New("store already open")
// ErrStoreClosed is returned when closing an already closed store.
ErrStoreClosed = errors.New("raft store already closed")
)
var (
// ErrNodeExists is returned when creating an already existing node.
ErrNodeExists = errors.New("node already exists")
// ErrNodeNotFound is returned when mutating a node that doesn't exist.
ErrNodeNotFound = errors.New("node not found")
// ErrNodesRequired is returned when at least one node is required for an operation.
// This occurs when creating a shard group.
ErrNodesRequired = errors.New("at least one node required")
)
var (
// ErrDatabaseExists is returned when creating an already existing database.
ErrDatabaseExists = errors.New("database already exists")
// ErrDatabaseNotFound is returned when mutating a database that doesn't exist.
ErrDatabaseNotFound = errors.New("database not found")
// ErrDatabaseNameRequired is returned when creating a database without a name.
ErrDatabaseNameRequired = errors.New("database name required")
)
var (
// ErrRetentionPolicyExists is returned when creating an already existing policy.
ErrRetentionPolicyExists = errors.New("retention policy already exists")
// ErrRetentionPolicyNotFound is returned when mutating a policy that doesn't exist.
ErrRetentionPolicyNotFound = errors.New("retention policy not found")
// ErrRetentionPolicyNameRequired is returned when creating a policy without a name.
ErrRetentionPolicyNameRequired = errors.New("retention policy name required")
// ErrRetentionPolicyNameExists is returned when renaming a policy to
// the same name as another existing policy.
ErrRetentionPolicyNameExists = errors.New("retention policy name already exists")
)
var (
// ErrShardGroupExists is returned when creating an already existing shard group.
ErrShardGroupExists = errors.New("shard group already exists")
// ErrShardGroupNotFound is returned when mutating a shard group that doesn't exist.
ErrShardGroupNotFound = errors.New("shard group not found")
)
var (
// ErrContinuousQueryExists is returned when creating an already existing continuous query.
ErrContinuousQueryExists = errors.New("continuous query already exists")
// ErrContinuousQueryNotFound is returned when removing a continuous query that doesn't exist.
ErrContinuousQueryNotFound = errors.New("continuous query not found")
)
var (
// ErrUserExists is returned when creating an already existing user.
ErrUserExists = errors.New("user already exists")
// ErrUserNotFound is returned when mutating a user that doesn't exist.
ErrUserNotFound = errors.New("user not found")
// ErrUsernameRequired is returned when creating a user without a username.
ErrUsernameRequired = errors.New("username required")
)
var errs = [...]error{
ErrStoreOpen, ErrStoreClosed,
ErrNodeExists, ErrNodeNotFound,
ErrDatabaseExists, ErrDatabaseNotFound, ErrDatabaseNameRequired,
}
// errLookup stores a mapping of error strings to well defined error types.
var errLookup = make(map[string]error)
func init() {
for _, err := range errs {
errLookup[err.Error()] = err
}
}
// lookupError returns a known error reference, if one exists.
// Otherwise returns err.
func lookupError(err error) error {
if e, ok := errLookup[err.Error()]; ok {
return e
}
return err
}

View File

@ -19,24 +19,22 @@ It has these top-level messages:
User
UserPrivilege
Command
CreateContinuousQueryCommand
DropContinuousQueryCommand
CreateNodeCommand
DeleteNodeCommand
CreateDatabaseCommand
DropDatabaseCommand
CreateDatabaseIfNotExistsCommand
CreateRetentionPolicyCommand
CreateRetentionPolicyIfNotExistsCommand
DeleteRetentionPolicyCommand
DropRetentionPolicyCommand
SetDefaultRetentionPolicyCommand
UpdateRetentionPolicyCommand
CreateShardGroupIfNotExistsCommand
CreateShardGroupCommand
DeleteShardGroupCommand
CreateContinuousQueryCommand
DropContinuousQueryCommand
CreateUserCommand
DeleteUserCommand
DropUserCommand
UpdateUserCommand
SetPrivilegeCommand
DeleteShardGroupCommand
*/
package internal
@ -50,65 +48,59 @@ var _ = math.Inf
type Command_Type int32
const (
Command_CreateContinuousQueryCommand Command_Type = 1
Command_DropContinuousQueryCommand Command_Type = 2
Command_CreateNodeCommand Command_Type = 3
Command_DeleteNodeCommand Command_Type = 4
Command_CreateDatabaseCommand Command_Type = 5
Command_DropDatabaseCommand Command_Type = 6
Command_CreateDatabaseIfNotExistsCommand Command_Type = 7
Command_CreateRetentionPolicyCommand Command_Type = 8
Command_CreateRetentionPolicyIfNotExistsCommand Command_Type = 9
Command_DeleteRetentionPolicyCommand Command_Type = 10
Command_SetDefaultRetentionPolicyCommand Command_Type = 11
Command_UpdateRetentionPolicyCommand Command_Type = 12
Command_CreateShardGroupIfNotExistsCommand Command_Type = 13
Command_CreateUserCommand Command_Type = 14
Command_DeleteUserCommand Command_Type = 15
Command_UpdateUserCommand Command_Type = 16
Command_SetPrivilegeCommand Command_Type = 17
Command_DeleteShardGroupCommand Command_Type = 18
Command_CreateNodeCommand Command_Type = 1
Command_DeleteNodeCommand Command_Type = 2
Command_CreateDatabaseCommand Command_Type = 3
Command_DropDatabaseCommand Command_Type = 4
Command_CreateRetentionPolicyCommand Command_Type = 5
Command_DropRetentionPolicyCommand Command_Type = 6
Command_SetDefaultRetentionPolicyCommand Command_Type = 7
Command_UpdateRetentionPolicyCommand Command_Type = 8
Command_CreateShardGroupCommand Command_Type = 9
Command_DeleteShardGroupCommand Command_Type = 10
Command_CreateContinuousQueryCommand Command_Type = 11
Command_DropContinuousQueryCommand Command_Type = 12
Command_CreateUserCommand Command_Type = 13
Command_DropUserCommand Command_Type = 14
Command_UpdateUserCommand Command_Type = 15
Command_SetPrivilegeCommand Command_Type = 16
)
var Command_Type_name = map[int32]string{
1: "CreateContinuousQueryCommand",
2: "DropContinuousQueryCommand",
3: "CreateNodeCommand",
4: "DeleteNodeCommand",
5: "CreateDatabaseCommand",
6: "DropDatabaseCommand",
7: "CreateDatabaseIfNotExistsCommand",
8: "CreateRetentionPolicyCommand",
9: "CreateRetentionPolicyIfNotExistsCommand",
10: "DeleteRetentionPolicyCommand",
11: "SetDefaultRetentionPolicyCommand",
12: "UpdateRetentionPolicyCommand",
13: "CreateShardGroupIfNotExistsCommand",
14: "CreateUserCommand",
15: "DeleteUserCommand",
16: "UpdateUserCommand",
17: "SetPrivilegeCommand",
18: "DeleteShardGroupCommand",
1: "CreateNodeCommand",
2: "DeleteNodeCommand",
3: "CreateDatabaseCommand",
4: "DropDatabaseCommand",
5: "CreateRetentionPolicyCommand",
6: "DropRetentionPolicyCommand",
7: "SetDefaultRetentionPolicyCommand",
8: "UpdateRetentionPolicyCommand",
9: "CreateShardGroupCommand",
10: "DeleteShardGroupCommand",
11: "CreateContinuousQueryCommand",
12: "DropContinuousQueryCommand",
13: "CreateUserCommand",
14: "DropUserCommand",
15: "UpdateUserCommand",
16: "SetPrivilegeCommand",
}
var Command_Type_value = map[string]int32{
"CreateContinuousQueryCommand": 1,
"DropContinuousQueryCommand": 2,
"CreateNodeCommand": 3,
"DeleteNodeCommand": 4,
"CreateDatabaseCommand": 5,
"DropDatabaseCommand": 6,
"CreateDatabaseIfNotExistsCommand": 7,
"CreateRetentionPolicyCommand": 8,
"CreateRetentionPolicyIfNotExistsCommand": 9,
"DeleteRetentionPolicyCommand": 10,
"SetDefaultRetentionPolicyCommand": 11,
"UpdateRetentionPolicyCommand": 12,
"CreateShardGroupIfNotExistsCommand": 13,
"CreateUserCommand": 14,
"DeleteUserCommand": 15,
"UpdateUserCommand": 16,
"SetPrivilegeCommand": 17,
"DeleteShardGroupCommand": 18,
"CreateNodeCommand": 1,
"DeleteNodeCommand": 2,
"CreateDatabaseCommand": 3,
"DropDatabaseCommand": 4,
"CreateRetentionPolicyCommand": 5,
"DropRetentionPolicyCommand": 6,
"SetDefaultRetentionPolicyCommand": 7,
"UpdateRetentionPolicyCommand": 8,
"CreateShardGroupCommand": 9,
"DeleteShardGroupCommand": 10,
"CreateContinuousQueryCommand": 11,
"DropContinuousQueryCommand": 12,
"CreateUserCommand": 13,
"DropUserCommand": 14,
"UpdateUserCommand": 15,
"SetPrivilegeCommand": 16,
}
func (x Command_Type) Enum() *Command_Type {
@ -436,55 +428,7 @@ func (m *Command) GetType() Command_Type {
if m != nil && m.Type != nil {
return *m.Type
}
return Command_CreateContinuousQueryCommand
}
type CreateContinuousQueryCommand struct {
Query *string `protobuf:"bytes,1,req" json:"Query,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *CreateContinuousQueryCommand) Reset() { *m = CreateContinuousQueryCommand{} }
func (m *CreateContinuousQueryCommand) String() string { return proto.CompactTextString(m) }
func (*CreateContinuousQueryCommand) ProtoMessage() {}
func (m *CreateContinuousQueryCommand) GetQuery() string {
if m != nil && m.Query != nil {
return *m.Query
}
return ""
}
var E_CreateContinuousQueryCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*CreateContinuousQueryCommand)(nil),
Field: 100,
Name: "internal.CreateContinuousQueryCommand.command",
Tag: "bytes,100,req,name=command",
}
type DropContinuousQueryCommand struct {
Query *string `protobuf:"bytes,1,req" json:"Query,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *DropContinuousQueryCommand) Reset() { *m = DropContinuousQueryCommand{} }
func (m *DropContinuousQueryCommand) String() string { return proto.CompactTextString(m) }
func (*DropContinuousQueryCommand) ProtoMessage() {}
func (m *DropContinuousQueryCommand) GetQuery() string {
if m != nil && m.Query != nil {
return *m.Query
}
return ""
}
var E_DropContinuousQueryCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*DropContinuousQueryCommand)(nil),
Field: 101,
Name: "internal.DropContinuousQueryCommand.command",
Tag: "bytes,101,req,name=command",
return Command_CreateNodeCommand
}
type CreateNodeCommand struct {
@ -506,13 +450,13 @@ func (m *CreateNodeCommand) GetHost() string {
var E_CreateNodeCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*CreateNodeCommand)(nil),
Field: 102,
Field: 101,
Name: "internal.CreateNodeCommand.command",
Tag: "bytes,102,req,name=command",
Tag: "bytes,101,opt,name=command",
}
type DeleteNodeCommand struct {
ID *string `protobuf:"bytes,1,req" json:"ID,omitempty"`
ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -520,19 +464,19 @@ func (m *DeleteNodeCommand) Reset() { *m = DeleteNodeCommand{} }
func (m *DeleteNodeCommand) String() string { return proto.CompactTextString(m) }
func (*DeleteNodeCommand) ProtoMessage() {}
func (m *DeleteNodeCommand) GetID() string {
func (m *DeleteNodeCommand) GetID() uint64 {
if m != nil && m.ID != nil {
return *m.ID
}
return ""
return 0
}
var E_DeleteNodeCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*DeleteNodeCommand)(nil),
Field: 103,
Field: 102,
Name: "internal.DeleteNodeCommand.command",
Tag: "bytes,103,req,name=command",
Tag: "bytes,102,opt,name=command",
}
type CreateDatabaseCommand struct {
@ -554,9 +498,9 @@ func (m *CreateDatabaseCommand) GetName() string {
var E_CreateDatabaseCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*CreateDatabaseCommand)(nil),
Field: 104,
Field: 103,
Name: "internal.CreateDatabaseCommand.command",
Tag: "bytes,104,req,name=command",
Tag: "bytes,103,opt,name=command",
}
type DropDatabaseCommand struct {
@ -578,33 +522,9 @@ func (m *DropDatabaseCommand) GetName() string {
var E_DropDatabaseCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*DropDatabaseCommand)(nil),
Field: 105,
Field: 104,
Name: "internal.DropDatabaseCommand.command",
Tag: "bytes,105,req,name=command",
}
type CreateDatabaseIfNotExistsCommand struct {
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *CreateDatabaseIfNotExistsCommand) Reset() { *m = CreateDatabaseIfNotExistsCommand{} }
func (m *CreateDatabaseIfNotExistsCommand) String() string { return proto.CompactTextString(m) }
func (*CreateDatabaseIfNotExistsCommand) ProtoMessage() {}
func (m *CreateDatabaseIfNotExistsCommand) GetName() string {
if m != nil && m.Name != nil {
return *m.Name
}
return ""
}
var E_CreateDatabaseIfNotExistsCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*CreateDatabaseIfNotExistsCommand)(nil),
Field: 106,
Name: "internal.CreateDatabaseIfNotExistsCommand.command",
Tag: "bytes,106,req,name=command",
Tag: "bytes,104,opt,name=command",
}
type CreateRetentionPolicyCommand struct {
@ -634,75 +554,41 @@ func (m *CreateRetentionPolicyCommand) GetRetentionPolicy() *RetentionPolicyInfo
var E_CreateRetentionPolicyCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*CreateRetentionPolicyCommand)(nil),
Field: 107,
Field: 105,
Name: "internal.CreateRetentionPolicyCommand.command",
Tag: "bytes,107,req,name=command",
Tag: "bytes,105,opt,name=command",
}
type CreateRetentionPolicyIfNotExistsCommand struct {
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
RetentionPolicy *RetentionPolicyInfo `protobuf:"bytes,2,req" json:"RetentionPolicy,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *CreateRetentionPolicyIfNotExistsCommand) Reset() {
*m = CreateRetentionPolicyIfNotExistsCommand{}
}
func (m *CreateRetentionPolicyIfNotExistsCommand) String() string { return proto.CompactTextString(m) }
func (*CreateRetentionPolicyIfNotExistsCommand) ProtoMessage() {}
func (m *CreateRetentionPolicyIfNotExistsCommand) GetDatabase() string {
if m != nil && m.Database != nil {
return *m.Database
}
return ""
}
func (m *CreateRetentionPolicyIfNotExistsCommand) GetRetentionPolicy() *RetentionPolicyInfo {
if m != nil {
return m.RetentionPolicy
}
return nil
}
var E_CreateRetentionPolicyIfNotExistsCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*CreateRetentionPolicyIfNotExistsCommand)(nil),
Field: 108,
Name: "internal.CreateRetentionPolicyIfNotExistsCommand.command",
Tag: "bytes,108,req,name=command",
}
type DeleteRetentionPolicyCommand struct {
type DropRetentionPolicyCommand struct {
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *DeleteRetentionPolicyCommand) Reset() { *m = DeleteRetentionPolicyCommand{} }
func (m *DeleteRetentionPolicyCommand) String() string { return proto.CompactTextString(m) }
func (*DeleteRetentionPolicyCommand) ProtoMessage() {}
func (m *DropRetentionPolicyCommand) Reset() { *m = DropRetentionPolicyCommand{} }
func (m *DropRetentionPolicyCommand) String() string { return proto.CompactTextString(m) }
func (*DropRetentionPolicyCommand) ProtoMessage() {}
func (m *DeleteRetentionPolicyCommand) GetDatabase() string {
func (m *DropRetentionPolicyCommand) GetDatabase() string {
if m != nil && m.Database != nil {
return *m.Database
}
return ""
}
func (m *DeleteRetentionPolicyCommand) GetName() string {
func (m *DropRetentionPolicyCommand) GetName() string {
if m != nil && m.Name != nil {
return *m.Name
}
return ""
}
var E_DeleteRetentionPolicyCommand_Command = &proto.ExtensionDesc{
var E_DropRetentionPolicyCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*DeleteRetentionPolicyCommand)(nil),
Field: 109,
Name: "internal.DeleteRetentionPolicyCommand.command",
Tag: "bytes,109,req,name=command",
ExtensionType: (*DropRetentionPolicyCommand)(nil),
Field: 106,
Name: "internal.DropRetentionPolicyCommand.command",
Tag: "bytes,106,opt,name=command",
}
type SetDefaultRetentionPolicyCommand struct {
@ -732,9 +618,9 @@ func (m *SetDefaultRetentionPolicyCommand) GetName() string {
var E_SetDefaultRetentionPolicyCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*SetDefaultRetentionPolicyCommand)(nil),
Field: 110,
Field: 107,
Name: "internal.SetDefaultRetentionPolicyCommand.command",
Tag: "bytes,110,req,name=command",
Tag: "bytes,107,opt,name=command",
}
type UpdateRetentionPolicyCommand struct {
@ -788,54 +674,142 @@ func (m *UpdateRetentionPolicyCommand) GetReplicaN() uint32 {
var E_UpdateRetentionPolicyCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*UpdateRetentionPolicyCommand)(nil),
Field: 111,
Field: 108,
Name: "internal.UpdateRetentionPolicyCommand.command",
Tag: "bytes,111,req,name=command",
Tag: "bytes,108,opt,name=command",
}
type CreateShardGroupIfNotExistsCommand struct {
type CreateShardGroupCommand struct {
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Policy *string `protobuf:"bytes,2,req" json:"Policy,omitempty"`
Timestamp *int64 `protobuf:"varint,3,req" json:"Timestamp,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *CreateShardGroupIfNotExistsCommand) Reset() { *m = CreateShardGroupIfNotExistsCommand{} }
func (m *CreateShardGroupIfNotExistsCommand) String() string { return proto.CompactTextString(m) }
func (*CreateShardGroupIfNotExistsCommand) ProtoMessage() {}
func (m *CreateShardGroupCommand) Reset() { *m = CreateShardGroupCommand{} }
func (m *CreateShardGroupCommand) String() string { return proto.CompactTextString(m) }
func (*CreateShardGroupCommand) ProtoMessage() {}
func (m *CreateShardGroupIfNotExistsCommand) GetDatabase() string {
func (m *CreateShardGroupCommand) GetDatabase() string {
if m != nil && m.Database != nil {
return *m.Database
}
return ""
}
func (m *CreateShardGroupIfNotExistsCommand) GetPolicy() string {
func (m *CreateShardGroupCommand) GetPolicy() string {
if m != nil && m.Policy != nil {
return *m.Policy
}
return ""
}
func (m *CreateShardGroupIfNotExistsCommand) GetTimestamp() int64 {
func (m *CreateShardGroupCommand) GetTimestamp() int64 {
if m != nil && m.Timestamp != nil {
return *m.Timestamp
}
return 0
}
var E_CreateShardGroupIfNotExistsCommand_Command = &proto.ExtensionDesc{
var E_CreateShardGroupCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*CreateShardGroupIfNotExistsCommand)(nil),
ExtensionType: (*CreateShardGroupCommand)(nil),
Field: 109,
Name: "internal.CreateShardGroupCommand.command",
Tag: "bytes,109,opt,name=command",
}
type DeleteShardGroupCommand struct {
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Policy *string `protobuf:"bytes,2,req" json:"Policy,omitempty"`
ShardGroupID *uint64 `protobuf:"varint,3,req" json:"ShardGroupID,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *DeleteShardGroupCommand) Reset() { *m = DeleteShardGroupCommand{} }
func (m *DeleteShardGroupCommand) String() string { return proto.CompactTextString(m) }
func (*DeleteShardGroupCommand) ProtoMessage() {}
func (m *DeleteShardGroupCommand) GetDatabase() string {
if m != nil && m.Database != nil {
return *m.Database
}
return ""
}
func (m *DeleteShardGroupCommand) GetPolicy() string {
if m != nil && m.Policy != nil {
return *m.Policy
}
return ""
}
func (m *DeleteShardGroupCommand) GetShardGroupID() uint64 {
if m != nil && m.ShardGroupID != nil {
return *m.ShardGroupID
}
return 0
}
var E_DeleteShardGroupCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*DeleteShardGroupCommand)(nil),
Field: 110,
Name: "internal.DeleteShardGroupCommand.command",
Tag: "bytes,110,opt,name=command",
}
type CreateContinuousQueryCommand struct {
Query *string `protobuf:"bytes,1,req" json:"Query,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *CreateContinuousQueryCommand) Reset() { *m = CreateContinuousQueryCommand{} }
func (m *CreateContinuousQueryCommand) String() string { return proto.CompactTextString(m) }
func (*CreateContinuousQueryCommand) ProtoMessage() {}
func (m *CreateContinuousQueryCommand) GetQuery() string {
if m != nil && m.Query != nil {
return *m.Query
}
return ""
}
var E_CreateContinuousQueryCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*CreateContinuousQueryCommand)(nil),
Field: 111,
Name: "internal.CreateContinuousQueryCommand.command",
Tag: "bytes,111,opt,name=command",
}
type DropContinuousQueryCommand struct {
Query *string `protobuf:"bytes,1,req" json:"Query,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *DropContinuousQueryCommand) Reset() { *m = DropContinuousQueryCommand{} }
func (m *DropContinuousQueryCommand) String() string { return proto.CompactTextString(m) }
func (*DropContinuousQueryCommand) ProtoMessage() {}
func (m *DropContinuousQueryCommand) GetQuery() string {
if m != nil && m.Query != nil {
return *m.Query
}
return ""
}
var E_DropContinuousQueryCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*DropContinuousQueryCommand)(nil),
Field: 112,
Name: "internal.CreateShardGroupIfNotExistsCommand.command",
Tag: "bytes,112,req,name=command",
Name: "internal.DropContinuousQueryCommand.command",
Tag: "bytes,112,opt,name=command",
}
type CreateUserCommand struct {
Username *string `protobuf:"bytes,1,req" json:"Username,omitempty"`
Password *string `protobuf:"bytes,2,req" json:"Password,omitempty"`
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Hash *string `protobuf:"bytes,2,req" json:"Hash,omitempty"`
Admin *bool `protobuf:"varint,3,req" json:"Admin,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -844,16 +818,16 @@ func (m *CreateUserCommand) Reset() { *m = CreateUserCommand{} }
func (m *CreateUserCommand) String() string { return proto.CompactTextString(m) }
func (*CreateUserCommand) ProtoMessage() {}
func (m *CreateUserCommand) GetUsername() string {
if m != nil && m.Username != nil {
return *m.Username
func (m *CreateUserCommand) GetName() string {
if m != nil && m.Name != nil {
return *m.Name
}
return ""
}
func (m *CreateUserCommand) GetPassword() string {
if m != nil && m.Password != nil {
return *m.Password
func (m *CreateUserCommand) GetHash() string {
if m != nil && m.Hash != nil {
return *m.Hash
}
return ""
}
@ -870,36 +844,36 @@ var E_CreateUserCommand_Command = &proto.ExtensionDesc{
ExtensionType: (*CreateUserCommand)(nil),
Field: 113,
Name: "internal.CreateUserCommand.command",
Tag: "bytes,113,req,name=command",
Tag: "bytes,113,opt,name=command",
}
type DeleteUserCommand struct {
Username *string `protobuf:"bytes,1,req" json:"Username,omitempty"`
type DropUserCommand struct {
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *DeleteUserCommand) Reset() { *m = DeleteUserCommand{} }
func (m *DeleteUserCommand) String() string { return proto.CompactTextString(m) }
func (*DeleteUserCommand) ProtoMessage() {}
func (m *DropUserCommand) Reset() { *m = DropUserCommand{} }
func (m *DropUserCommand) String() string { return proto.CompactTextString(m) }
func (*DropUserCommand) ProtoMessage() {}
func (m *DeleteUserCommand) GetUsername() string {
if m != nil && m.Username != nil {
return *m.Username
func (m *DropUserCommand) GetName() string {
if m != nil && m.Name != nil {
return *m.Name
}
return ""
}
var E_DeleteUserCommand_Command = &proto.ExtensionDesc{
var E_DropUserCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*DeleteUserCommand)(nil),
ExtensionType: (*DropUserCommand)(nil),
Field: 114,
Name: "internal.DeleteUserCommand.command",
Tag: "bytes,114,req,name=command",
Name: "internal.DropUserCommand.command",
Tag: "bytes,114,opt,name=command",
}
type UpdateUserCommand struct {
Username *string `protobuf:"bytes,1,req" json:"Username,omitempty"`
Password *string `protobuf:"bytes,2,req" json:"Password,omitempty"`
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Hash *string `protobuf:"bytes,2,req" json:"Hash,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -907,16 +881,16 @@ func (m *UpdateUserCommand) Reset() { *m = UpdateUserCommand{} }
func (m *UpdateUserCommand) String() string { return proto.CompactTextString(m) }
func (*UpdateUserCommand) ProtoMessage() {}
func (m *UpdateUserCommand) GetUsername() string {
if m != nil && m.Username != nil {
return *m.Username
func (m *UpdateUserCommand) GetName() string {
if m != nil && m.Name != nil {
return *m.Name
}
return ""
}
func (m *UpdateUserCommand) GetPassword() string {
if m != nil && m.Password != nil {
return *m.Password
func (m *UpdateUserCommand) GetHash() string {
if m != nil && m.Hash != nil {
return *m.Hash
}
return ""
}
@ -926,7 +900,7 @@ var E_UpdateUserCommand_Command = &proto.ExtensionDesc{
ExtensionType: (*UpdateUserCommand)(nil),
Field: 115,
Name: "internal.UpdateUserCommand.command",
Tag: "bytes,115,req,name=command",
Tag: "bytes,115,opt,name=command",
}
type SetPrivilegeCommand struct {
@ -958,67 +932,25 @@ var E_SetPrivilegeCommand_Command = &proto.ExtensionDesc{
ExtensionType: (*SetPrivilegeCommand)(nil),
Field: 116,
Name: "internal.SetPrivilegeCommand.command",
Tag: "bytes,116,req,name=command",
}
type DeleteShardGroupCommand struct {
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Policy *string `protobuf:"bytes,2,req" json:"Policy,omitempty"`
ShardID *uint64 `protobuf:"varint,3,req" json:"ShardID,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *DeleteShardGroupCommand) Reset() { *m = DeleteShardGroupCommand{} }
func (m *DeleteShardGroupCommand) String() string { return proto.CompactTextString(m) }
func (*DeleteShardGroupCommand) ProtoMessage() {}
func (m *DeleteShardGroupCommand) GetDatabase() string {
if m != nil && m.Database != nil {
return *m.Database
}
return ""
}
func (m *DeleteShardGroupCommand) GetPolicy() string {
if m != nil && m.Policy != nil {
return *m.Policy
}
return ""
}
func (m *DeleteShardGroupCommand) GetShardID() uint64 {
if m != nil && m.ShardID != nil {
return *m.ShardID
}
return 0
}
var E_DeleteShardGroupCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*DeleteShardGroupCommand)(nil),
Field: 117,
Name: "internal.DeleteShardGroupCommand.command",
Tag: "bytes,117,req,name=command",
Tag: "bytes,116,opt,name=command",
}
func init() {
proto.RegisterEnum("internal.Command_Type", Command_Type_name, Command_Type_value)
proto.RegisterExtension(E_CreateContinuousQueryCommand_Command)
proto.RegisterExtension(E_DropContinuousQueryCommand_Command)
proto.RegisterExtension(E_CreateNodeCommand_Command)
proto.RegisterExtension(E_DeleteNodeCommand_Command)
proto.RegisterExtension(E_CreateDatabaseCommand_Command)
proto.RegisterExtension(E_DropDatabaseCommand_Command)
proto.RegisterExtension(E_CreateDatabaseIfNotExistsCommand_Command)
proto.RegisterExtension(E_CreateRetentionPolicyCommand_Command)
proto.RegisterExtension(E_CreateRetentionPolicyIfNotExistsCommand_Command)
proto.RegisterExtension(E_DeleteRetentionPolicyCommand_Command)
proto.RegisterExtension(E_DropRetentionPolicyCommand_Command)
proto.RegisterExtension(E_SetDefaultRetentionPolicyCommand_Command)
proto.RegisterExtension(E_UpdateRetentionPolicyCommand_Command)
proto.RegisterExtension(E_CreateShardGroupIfNotExistsCommand_Command)
proto.RegisterExtension(E_CreateShardGroupCommand_Command)
proto.RegisterExtension(E_DeleteShardGroupCommand_Command)
proto.RegisterExtension(E_CreateContinuousQueryCommand_Command)
proto.RegisterExtension(E_DropContinuousQueryCommand_Command)
proto.RegisterExtension(E_CreateUserCommand_Command)
proto.RegisterExtension(E_DeleteUserCommand_Command)
proto.RegisterExtension(E_DropUserCommand_Command)
proto.RegisterExtension(E_UpdateUserCommand_Command)
proto.RegisterExtension(E_SetPrivilegeCommand_Command)
proto.RegisterExtension(E_DeleteShardGroupCommand_Command)
}

View File

@ -70,97 +70,66 @@ message Command {
extensions 100 to max;
enum Type {
CreateContinuousQueryCommand = 1;
DropContinuousQueryCommand = 2;
CreateNodeCommand = 3;
DeleteNodeCommand = 4;
CreateDatabaseCommand = 5;
DropDatabaseCommand = 6;
CreateDatabaseIfNotExistsCommand = 7;
CreateRetentionPolicyCommand = 8;
CreateRetentionPolicyIfNotExistsCommand = 9;
DeleteRetentionPolicyCommand = 10;
SetDefaultRetentionPolicyCommand = 11;
UpdateRetentionPolicyCommand = 12;
CreateShardGroupIfNotExistsCommand = 13;
CreateUserCommand = 14;
DeleteUserCommand = 15;
UpdateUserCommand = 16;
SetPrivilegeCommand = 17;
DeleteShardGroupCommand = 18;
CreateNodeCommand = 1;
DeleteNodeCommand = 2;
CreateDatabaseCommand = 3;
DropDatabaseCommand = 4;
CreateRetentionPolicyCommand = 5;
DropRetentionPolicyCommand = 6;
SetDefaultRetentionPolicyCommand = 7;
UpdateRetentionPolicyCommand = 8;
CreateShardGroupCommand = 9;
DeleteShardGroupCommand = 10;
CreateContinuousQueryCommand = 11;
DropContinuousQueryCommand = 12;
CreateUserCommand = 13;
DropUserCommand = 14;
UpdateUserCommand = 15;
SetPrivilegeCommand = 16;
}
required Type type = 1;
}
message CreateContinuousQueryCommand {
extend Command {
required CreateContinuousQueryCommand command = 100;
}
required string Query = 1;
}
message DropContinuousQueryCommand {
extend Command {
required DropContinuousQueryCommand command = 101;
}
required string Query = 1;
}
message CreateNodeCommand {
extend Command {
required CreateNodeCommand command = 102;
optional CreateNodeCommand command = 101;
}
required string Host = 1;
}
message DeleteNodeCommand {
extend Command {
required DeleteNodeCommand command = 103;
optional DeleteNodeCommand command = 102;
}
required string ID = 1;
required uint64 ID = 1;
}
message CreateDatabaseCommand {
extend Command {
required CreateDatabaseCommand command = 104;
optional CreateDatabaseCommand command = 103;
}
required string Name = 1;
}
message DropDatabaseCommand {
extend Command {
required DropDatabaseCommand command = 105;
}
required string Name = 1;
}
message CreateDatabaseIfNotExistsCommand {
extend Command {
required CreateDatabaseIfNotExistsCommand command = 106;
optional DropDatabaseCommand command = 104;
}
required string Name = 1;
}
message CreateRetentionPolicyCommand {
extend Command {
required CreateRetentionPolicyCommand command = 107;
optional CreateRetentionPolicyCommand command = 105;
}
required string Database = 1;
required RetentionPolicyInfo RetentionPolicy = 2;
}
message CreateRetentionPolicyIfNotExistsCommand {
message DropRetentionPolicyCommand {
extend Command {
required CreateRetentionPolicyIfNotExistsCommand command = 108;
}
required string Database = 1;
required RetentionPolicyInfo RetentionPolicy = 2;
}
message DeleteRetentionPolicyCommand {
extend Command {
required DeleteRetentionPolicyCommand command = 109;
optional DropRetentionPolicyCommand command = 106;
}
required string Database = 1;
required string Name = 2;
@ -168,7 +137,7 @@ message DeleteRetentionPolicyCommand {
message SetDefaultRetentionPolicyCommand {
extend Command {
required SetDefaultRetentionPolicyCommand command = 110;
optional SetDefaultRetentionPolicyCommand command = 107;
}
required string Database = 1;
required string Name = 2;
@ -176,7 +145,7 @@ message SetDefaultRetentionPolicyCommand {
message UpdateRetentionPolicyCommand {
extend Command {
required UpdateRetentionPolicyCommand command = 111;
optional UpdateRetentionPolicyCommand command = 108;
}
required string Database = 1;
required string Name = 2;
@ -185,53 +154,67 @@ message UpdateRetentionPolicyCommand {
optional uint32 ReplicaN = 5;
}
message CreateShardGroupIfNotExistsCommand {
message CreateShardGroupCommand {
extend Command {
required CreateShardGroupIfNotExistsCommand command = 112;
optional CreateShardGroupCommand command = 109;
}
required string Database = 1;
required string Policy = 2;
required int64 Timestamp = 3;
}
message DeleteShardGroupCommand {
extend Command {
optional DeleteShardGroupCommand command = 110;
}
required string Database = 1;
required string Policy = 2;
required uint64 ShardGroupID = 3;
}
message CreateContinuousQueryCommand {
extend Command {
optional CreateContinuousQueryCommand command = 111;
}
required string Query = 1;
}
message DropContinuousQueryCommand {
extend Command {
optional DropContinuousQueryCommand command = 112;
}
required string Query = 1;
}
message CreateUserCommand {
extend Command {
required CreateUserCommand command = 113;
optional CreateUserCommand command = 113;
}
required string Username = 1;
required string Password = 2;
required string Name = 1;
required string Hash = 2;
required bool Admin = 3;
}
message DeleteUserCommand {
message DropUserCommand {
extend Command {
required DeleteUserCommand command = 114;
optional DropUserCommand command = 114;
}
required string Username = 1;
required string Name = 1;
}
message UpdateUserCommand {
extend Command {
required UpdateUserCommand command = 115;
optional UpdateUserCommand command = 115;
}
required string Username = 1;
required string Password = 2;
required string Name = 1;
required string Hash = 2;
}
message SetPrivilegeCommand {
extend Command {
required SetPrivilegeCommand command = 116;
optional SetPrivilegeCommand command = 116;
}
required string Username = 1;
required UserPrivilege Privilege = 2;
}
message DeleteShardGroupCommand {
extend Command {
required DeleteShardGroupCommand command = 117;
}
required string Database = 1;
required string Policy = 2;
required uint64 ShardID = 3;
}

View File

@ -15,6 +15,7 @@ import (
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
"github.com/influxdb/influxdb/meta/internal"
"golang.org/x/crypto/bcrypt"
)
const (
@ -95,7 +96,7 @@ func (s *Store) Open(path string) error {
// Check if store has already been opened.
if s.opened() {
return errors.New("raft store already open")
return ErrStoreOpen
}
s.path = path
@ -143,7 +144,7 @@ func (s *Store) Open(path string) error {
}
// Create raft log.
r, err := raft.NewRaft(config, (*StoreFSM)(s), store, store, snapshots, s.peers, s.transport)
r, err := raft.NewRaft(config, (*storeFSM)(s), store, store, snapshots, s.peers, s.transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
}
@ -168,7 +169,7 @@ func (s *Store) Close() error {
func (s *Store) close() error {
// Check if store has already been closed.
if !s.opened() {
return errors.New("raft store already closed")
return ErrStoreClosed
}
s.path = ""
@ -188,31 +189,42 @@ func (s *Store) close() error {
return nil
}
// SetData sets the root meta data.
func (s *Store) SetData(data *Data) {
s.mu.Lock()
defer s.mu.Unlock()
s.data = data
}
// LeaderCh returns a channel that notifies on leadership change.
// Panics when the store has not been opened yet.
func (s *Store) LeaderCh() <-chan bool {
s.mu.RLock()
defer s.mu.RUnlock()
if s.raft == nil {
panic("cannot retrieve leadership channel when closed")
}
assert(s.raft != nil, "cannot retrieve leadership channel when closed")
return s.raft.LeaderCh()
}
// Node returns a node by id.
func (s *Store) Node(id uint64) (ni *NodeInfo, err error) {
err = s.read(func(data *Data) error {
ni = data.Node(id)
if ni == nil {
return errInvalidate
}
return nil
})
return
}
// NodeByHost returns a node by hostname.
func (s *Store) NodeByHost(host string) (ni *NodeInfo, err error) {
err = s.read(func(data *Data) error {
ni = data.NodeByHost(host)
if ni == nil {
return errInvalidate
}
return nil
})
return
}
// CreateNode creates a new node in the store.
func (s *Store) CreateNode(host string) (*NodeInfo, error) {
if err := s.exec(
internal.Command_CreateNodeCommand,
internal.E_CreateNodeCommand_Command,
if err := s.exec(internal.Command_CreateNodeCommand, internal.E_CreateNodeCommand_Command,
&internal.CreateNodeCommand{
Host: proto.String(host),
},
@ -222,57 +234,318 @@ func (s *Store) CreateNode(host string) (*NodeInfo, error) {
return s.NodeByHost(host)
}
// NodeByHost returns a node by hostname.
func (s *Store) NodeByHost(host string) (*NodeInfo, error) {
n := s.data.NodeByHost(host)
// FIX(benbjohnson): Invalidate cache if not found and check again.
return n, nil
// DeleteNode removes a node from the metastore by id.
func (s *Store) DeleteNode(id uint64) error {
return s.exec(internal.Command_DeleteNodeCommand, internal.E_DeleteNodeCommand_Command,
&internal.DeleteNodeCommand{
ID: proto.Uint64(id),
},
)
}
// CreateContinuousQuery(query string) (*ContinuousQueryInfo, error)
// DropContinuousQuery(query string) error
// Database returns a database by name.
func (s *Store) Database(name string) (di *DatabaseInfo, err error) {
err = s.read(func(data *Data) error {
di = data.Database(name)
if di == nil {
return errInvalidate
}
return nil
})
return
}
// Node(id uint64) (*NodeInfo, error)
// NodeByHost(host string) (*NodeInfo, error)
// CreateNode(host string) (*NodeInfo, error)
// DeleteNode(id uint64) error
// CreateDatabase creates a new database in the store.
func (s *Store) CreateDatabase(name string) (*DatabaseInfo, error) {
if err := s.exec(internal.Command_CreateDatabaseCommand, internal.E_CreateDatabaseCommand_Command,
&internal.CreateDatabaseCommand{
Name: proto.String(name),
},
); err != nil {
return nil, err
}
return s.Database(name)
}
// Database(name string) (*DatabaseInfo, error)
// CreateDatabase(name string) (*DatabaseInfo, error)
// CreateDatabaseIfNotExists(name string) (*DatabaseInfo, error)
// DropDatabase(name string) error
// FIX: CreateDatabaseIfNotExists(name string) (*DatabaseInfo, error)
// RetentionPolicy(database, name string) (*RetentionPolicyInfo, error)
// CreateRetentionPolicy(database string, rp *RetentionPolicyInfo) (*RetentionPolicyInfo, error)
// CreateRetentionPolicyIfNotExists(database string, rp *RetentionPolicyInfo) (*RetentionPolicyInfo, error)
// SetDefaultRetentionPolicy(database, name string) error
// UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) (*RetentionPolicyInfo, error)
// DeleteRetentionPolicy(database, name string) error
// DropDatabase removes a database from the metastore by name.
func (s *Store) DropDatabase(name string) error {
return s.exec(internal.Command_DropDatabaseCommand, internal.E_DropDatabaseCommand_Command,
&internal.DropDatabaseCommand{
Name: proto.String(name),
},
)
}
// ShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error)
// CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*ShardGroupInfo, error)
// DeleteShardGroup(database, policy string, shardID uint64) error
// RetentionPolicy returns a retention policy for a database by name.
func (s *Store) RetentionPolicy(database, name string) (rpi *RetentionPolicyInfo, err error) {
err = s.read(func(data *Data) error {
rpi, err = data.RetentionPolicy(database, name)
if err != nil {
return err
} else if rpi == nil {
return errInvalidate
}
return nil
})
return
}
// User(username string) (*UserInfo, error)
// CreateUser(username, password string, admin bool) (*UserInfo, error)
// UpdateUser(username, password string) (*UserInfo, error)
// DeleteUser(username string) error
// SetPrivilege(p influxql.Privilege, username string, dbname string) error
// CreateRetentionPolicy creates a new retention policy for a database.
func (s *Store) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo) (*RetentionPolicyInfo, error) {
if err := s.exec(internal.Command_CreateRetentionPolicyCommand, internal.E_CreateRetentionPolicyCommand_Command,
&internal.CreateRetentionPolicyCommand{
Database: proto.String(database),
RetentionPolicy: rpi.protobuf(),
},
); err != nil {
return nil, err
}
return s.RetentionPolicy(database, rpi.Name)
}
// SetDefaultRetentionPolicy sets the default retention policy for a database.
func (s *Store) SetDefaultRetentionPolicy(database, name string) error {
return s.exec(internal.Command_SetDefaultRetentionPolicyCommand, internal.E_SetDefaultRetentionPolicyCommand_Command,
&internal.SetDefaultRetentionPolicyCommand{
Database: proto.String(database),
Name: proto.String(name),
},
)
}
// UpdateRetentionPolicy updates an existing retention policy.
func (s *Store) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error {
var newName *string
if rpu.Name != nil {
newName = rpu.Name
}
var duration *int64
if rpu.Duration != nil {
value := int64(*rpu.Duration)
duration = &value
}
var replicaN *uint32
if rpu.Duration != nil {
value := uint32(*rpu.ReplicaN)
replicaN = &value
}
return s.exec(internal.Command_UpdateRetentionPolicyCommand, internal.E_UpdateRetentionPolicyCommand_Command,
&internal.UpdateRetentionPolicyCommand{
Database: proto.String(database),
Name: proto.String(name),
NewName: newName,
Duration: duration,
ReplicaN: replicaN,
},
)
}
// DropRetentionPolicy removes a policy from a database by name.
func (s *Store) DropRetentionPolicy(database, name string) error {
return s.exec(internal.Command_DropRetentionPolicyCommand, internal.E_DropRetentionPolicyCommand_Command,
&internal.DropRetentionPolicyCommand{
Database: proto.String(database),
Name: proto.String(name),
},
)
}
// FIX: CreateRetentionPolicyIfNotExists(database string, rp *RetentionPolicyInfo) (*RetentionPolicyInfo, error)
// CreateShardGroup creates a new shard group in a retention policy for a given time.
func (s *Store) CreateShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
if err := s.exec(internal.Command_CreateShardGroupCommand, internal.E_CreateShardGroupCommand_Command,
&internal.CreateShardGroupCommand{
Database: proto.String(database),
Policy: proto.String(policy),
Timestamp: proto.Int64(timestamp.UnixNano()),
},
); err != nil {
return nil, err
}
return s.ShardGroupByTimestamp(database, policy, timestamp)
}
// DeleteShardGroup removes an existing shard group from a policy by ID.
func (s *Store) DeleteShardGroup(database, policy string, id uint64) error {
return s.exec(internal.Command_DeleteShardGroupCommand, internal.E_DeleteShardGroupCommand_Command,
&internal.DeleteShardGroupCommand{
Database: proto.String(database),
Policy: proto.String(policy),
ShardGroupID: proto.Uint64(id),
},
)
}
// ShardGroupByTimestamp returns a shard group for a policy by timestamp.
func (s *Store) ShardGroupByTimestamp(database, policy string, timestamp time.Time) (sgi *ShardGroupInfo, err error) {
err = s.read(func(data *Data) error {
sgi, err = data.ShardGroupByTimestamp(database, policy, timestamp)
if err != nil {
return err
} else if sgi == nil {
return errInvalidate
}
return nil
})
return
}
// CreateContinuousQuery creates a new continuous query on the store.
func (s *Store) CreateContinuousQuery(query string) error {
return s.exec(internal.Command_CreateContinuousQueryCommand, internal.E_CreateContinuousQueryCommand_Command,
&internal.CreateContinuousQueryCommand{
Query: proto.String(query),
},
)
}
// DropContinuousQuery removes a continuous query from the store.
func (s *Store) DropContinuousQuery(query string) error {
return s.exec(internal.Command_DropContinuousQueryCommand, internal.E_DropContinuousQueryCommand_Command,
&internal.DropContinuousQueryCommand{
Query: proto.String(query),
},
)
}
// ContinuousQueries returns all continuous queries.
func (s *Store) ContinuousQueries() (a []ContinuousQueryInfo, err error) {
err = s.read(func(data *Data) error {
a = data.ContinuousQueries
return nil
})
return
}
// User returns a user by name.
func (s *Store) User(name string) (ui *UserInfo, err error) {
err = s.read(func(data *Data) error {
ui = data.User(name)
if ui == nil {
return errInvalidate
}
return nil
})
return
}
// Users returns a list of all users.
func (s *Store) Users() (a []UserInfo, err error) {
err = s.read(func(data *Data) error {
a = data.Users
return nil
})
return
}
// CreateUser creates a new user in the store.
func (s *Store) CreateUser(name, password string, admin bool) (*UserInfo, error) {
// Hash the password before serializing it.
hash, err := HashPassword(password)
if err != nil {
return nil, err
}
// Serialize command and send it to the leader.
if err := s.exec(internal.Command_CreateUserCommand, internal.E_CreateUserCommand_Command,
&internal.CreateUserCommand{
Name: proto.String(name),
Hash: proto.String(string(hash)),
Admin: proto.Bool(admin),
},
); err != nil {
return nil, err
}
return s.User(name)
}
// DropUser removes a user from the metastore by name.
func (s *Store) DropUser(name string) error {
return s.exec(internal.Command_DropUserCommand, internal.E_DropUserCommand_Command,
&internal.DropUserCommand{
Name: proto.String(name),
},
)
}
// UpdateUser updates an existing user in the store.
func (s *Store) UpdateUser(name, password string) error {
// Hash the password before serializing it.
hash, err := HashPassword(password)
if err != nil {
return err
}
// Serialize command and send it to the leader.
return s.exec(internal.Command_UpdateUserCommand, internal.E_UpdateUserCommand_Command,
&internal.UpdateUserCommand{
Name: proto.String(name),
Hash: proto.String(string(hash)),
},
)
}
// read executes a function with the current metadata.
// If an error is returned then the cache is invalidated and retried.
//
// The error returned by the retry is passed through to the original caller
// unless the error is errInvalidate. A nil error is passed through when
// errInvalidate is returned.
func (s *Store) read(fn func(*Data) error) error {
// First use the cached metadata.
s.mu.RLock()
data := s.data
s.mu.RUnlock()
// Execute fn against cached data.
// Return immediately if there was no error.
if err := fn(data); err == nil {
return nil
}
// If an error occurred then invalidate cache and retry.
if err := s.invalidate(); err != nil {
return err
}
// Re-read the metadata.
s.mu.RLock()
data = s.data
s.mu.RUnlock()
// Passthrough error unless it is a cache invalidation.
if err := fn(data); err != nil && err != errInvalidate {
return err
}
return nil
}
// errInvalidate is returned to read() when the cache should be invalidated
// but an error should not be passed through to the caller.
var errInvalidate = errors.New("invalidate cache")
func (s *Store) invalidate() error {
return nil // FIX(benbjohnson): Reload cache from the leader.
}
func (s *Store) exec(typ internal.Command_Type, desc *proto.ExtensionDesc, value interface{}) error {
// Create command.
cmd := &internal.Command{Type: &typ}
if err := proto.SetExtension(cmd, desc, value); err != nil {
return fmt.Errorf("set extension: %s", err)
}
err := proto.SetExtension(cmd, desc, value)
assert(err == nil, "proto.SetExtension: %s", err)
// Marshal to a byte slice.
b, err := proto.Marshal(cmd)
if err != nil {
return fmt.Errorf("marshal proto: %s", err)
}
assert(err == nil, "proto.Marshal: %s", err)
// Apply to raft log.
f := s.raft.Apply(b, 0)
@ -280,27 +553,32 @@ func (s *Store) exec(typ internal.Command_Type, desc *proto.ExtensionDesc, value
return err
}
if resp := f.Response(); resp != nil {
panic(fmt.Sprintf("unexpected response: %#v", resp))
// Return response if it's an error.
// No other non-nil objects should be returned.
resp := f.Response()
if err, ok := resp.(error); ok {
return lookupError(err)
}
assert(resp == nil, "unexpected response: %#v", resp)
return nil
}
// StoreFSM represents the finite state machine used by Store to interact with Raft.
type StoreFSM Store
// storeFSM represents the finite state machine used by Store to interact with Raft.
type storeFSM Store
func (fsm *StoreFSM) Apply(l *raft.Log) interface{} {
func (fsm *storeFSM) Apply(l *raft.Log) interface{} {
var cmd internal.Command
if err := proto.Unmarshal(l.Data, &cmd); err != nil {
panic(fmt.Errorf("cannot marshal command: %x", l.Data))
}
// Lock the store.
s := (*Store)(fsm)
s.mu.Lock()
defer s.mu.Unlock()
switch cmd.GetType() {
case internal.Command_CreateContinuousQueryCommand:
return fsm.applyCreateContinuousQueryCommand(&cmd)
case internal.Command_DropContinuousQueryCommand:
return fsm.applyDropContinuousQueryCommand(&cmd)
case internal.Command_CreateNodeCommand:
return fsm.applyCreateNodeCommand(&cmd)
case internal.Command_DeleteNodeCommand:
@ -309,24 +587,24 @@ func (fsm *StoreFSM) Apply(l *raft.Log) interface{} {
return fsm.applyCreateDatabaseCommand(&cmd)
case internal.Command_DropDatabaseCommand:
return fsm.applyDropDatabaseCommand(&cmd)
case internal.Command_CreateDatabaseIfNotExistsCommand:
return fsm.applyCreateDatabaseIfNotExistsCommand(&cmd)
case internal.Command_CreateRetentionPolicyCommand:
return fsm.applyCreateRetentionPolicyCommand(&cmd)
case internal.Command_CreateRetentionPolicyIfNotExistsCommand:
return fsm.applyCreateRetentionPolicyIfNotExistsCommand(&cmd)
case internal.Command_DeleteRetentionPolicyCommand:
return fsm.applyDeleteRetentionPolicyCommand(&cmd)
case internal.Command_DropRetentionPolicyCommand:
return fsm.applyDropRetentionPolicyCommand(&cmd)
case internal.Command_SetDefaultRetentionPolicyCommand:
return fsm.applySetDefaultRetentionPolicyCommand(&cmd)
case internal.Command_UpdateRetentionPolicyCommand:
return fsm.applyUpdateRetentionPolicyCommand(&cmd)
case internal.Command_CreateShardGroupIfNotExistsCommand:
return fsm.applyCreateShardGroupIfNotExistsCommand(&cmd)
case internal.Command_CreateShardGroupCommand:
return fsm.applyCreateShardGroupCommand(&cmd)
case internal.Command_CreateContinuousQueryCommand:
return fsm.applyCreateContinuousQueryCommand(&cmd)
case internal.Command_DropContinuousQueryCommand:
return fsm.applyDropContinuousQueryCommand(&cmd)
case internal.Command_CreateUserCommand:
return fsm.applyCreateUserCommand(&cmd)
case internal.Command_DeleteUserCommand:
return fsm.applyDeleteUserCommand(&cmd)
case internal.Command_DropUserCommand:
return fsm.applyDropUserCommand(&cmd)
case internal.Command_UpdateUserCommand:
return fsm.applyUpdateUserCommand(&cmd)
case internal.Command_SetPrivilegeCommand:
@ -338,97 +616,245 @@ func (fsm *StoreFSM) Apply(l *raft.Log) interface{} {
}
}
func (fsm *StoreFSM) applyCreateContinuousQueryCommand(cmd *internal.Command) interface{} {
panic("not yet implemented")
}
func (fsm *StoreFSM) applyDropContinuousQueryCommand(cmd *internal.Command) interface{} {
panic("not yet implemented")
}
func (fsm *StoreFSM) applyCreateNodeCommand(cmd *internal.Command) interface{} {
func (fsm *storeFSM) applyCreateNodeCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateNodeCommand_Command)
v := ext.(*internal.CreateNodeCommand)
// Copy data and update.
data, err := fsm.data.CreateNode(v.GetHost())
if err != nil {
other := fsm.data.Clone()
if err := other.CreateNode(v.GetHost()); err != nil {
return err
}
(*Store)(fsm).SetData(data)
fsm.data = other
return nil
}
func (fsm *StoreFSM) applyDeleteNodeCommand(cmd *internal.Command) interface{} {
func (fsm *storeFSM) applyDeleteNodeCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DeleteNodeCommand_Command)
v := ext.(*internal.DeleteNodeCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DeleteNode(v.GetID()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyCreateDatabaseCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateDatabaseCommand_Command)
v := ext.(*internal.CreateDatabaseCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.CreateDatabase(v.GetName()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyDropDatabaseCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DropDatabaseCommand_Command)
v := ext.(*internal.DropDatabaseCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DropDatabase(v.GetName()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyCreateRetentionPolicyCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateRetentionPolicyCommand_Command)
v := ext.(*internal.CreateRetentionPolicyCommand)
pb := v.GetRetentionPolicy()
// Copy data and update.
other := fsm.data.Clone()
if err := other.CreateRetentionPolicy(v.GetDatabase(),
&RetentionPolicyInfo{
Name: pb.GetName(),
ReplicaN: int(pb.GetReplicaN()),
Duration: time.Duration(pb.GetDuration()),
ShardGroupDuration: time.Duration(pb.GetShardGroupDuration()),
}); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyDropRetentionPolicyCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DropRetentionPolicyCommand_Command)
v := ext.(*internal.DropRetentionPolicyCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DropRetentionPolicy(v.GetDatabase(), v.GetName()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applySetDefaultRetentionPolicyCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_SetDefaultRetentionPolicyCommand_Command)
v := ext.(*internal.SetDefaultRetentionPolicyCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.SetDefaultRetentionPolicy(v.GetDatabase(), v.GetName()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyUpdateRetentionPolicyCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_UpdateRetentionPolicyCommand_Command)
v := ext.(*internal.UpdateRetentionPolicyCommand)
// Create update object.
rpu := RetentionPolicyUpdate{Name: v.NewName}
if v.Duration != nil {
value := time.Duration(v.GetDuration())
rpu.Duration = &value
}
if v.ReplicaN != nil {
value := int(v.GetReplicaN())
rpu.ReplicaN = &value
}
// Copy data and update.
other := fsm.data.Clone()
if err := other.UpdateRetentionPolicy(v.GetDatabase(), v.GetName(), &rpu); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyCreateShardGroupCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateShardGroupCommand_Command)
v := ext.(*internal.CreateShardGroupCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.CreateShardGroup(v.GetDatabase(), v.GetPolicy(), time.Unix(0, v.GetTimestamp())); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyDeleteShardGroupCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DeleteShardGroupCommand_Command)
v := ext.(*internal.DeleteShardGroupCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DeleteShardGroup(v.GetDatabase(), v.GetPolicy(), v.GetShardGroupID()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyCreateContinuousQueryCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateContinuousQueryCommand_Command)
v := ext.(*internal.CreateContinuousQueryCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.CreateContinuousQuery(v.GetQuery()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyDropContinuousQueryCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DropContinuousQueryCommand_Command)
v := ext.(*internal.DropContinuousQueryCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DropContinuousQuery(v.GetQuery()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyCreateUserCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateUserCommand_Command)
v := ext.(*internal.CreateUserCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.CreateUser(v.GetName(), v.GetHash(), v.GetAdmin()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyDropUserCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DropUserCommand_Command)
v := ext.(*internal.DropUserCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DropUser(v.GetName()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyUpdateUserCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_UpdateUserCommand_Command)
v := ext.(*internal.UpdateUserCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.UpdateUser(v.GetName(), v.GetHash()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applySetPrivilegeCommand(cmd *internal.Command) interface{} {
panic("not yet implemented")
}
func (fsm *StoreFSM) applyCreateDatabaseCommand(cmd *internal.Command) interface{} {
panic("not yet implemented")
}
func (fsm *StoreFSM) applyDropDatabaseCommand(cmd *internal.Command) interface{} {
panic("not yet implemented")
}
func (fsm *StoreFSM) applyCreateDatabaseIfNotExistsCommand(cmd *internal.Command) interface{} {
panic("not yet implemented")
}
func (fsm *StoreFSM) applyCreateRetentionPolicyCommand(cmd *internal.Command) interface{} {
panic("not yet implemented")
}
func (fsm *StoreFSM) applyCreateRetentionPolicyIfNotExistsCommand(cmd *internal.Command) interface{} {
panic("not yet implemented")
}
func (fsm *StoreFSM) applyDeleteRetentionPolicyCommand(cmd *internal.Command) interface{} {
panic("not yet implemented")
}
func (fsm *StoreFSM) applySetDefaultRetentionPolicyCommand(cmd *internal.Command) interface{} {
panic("not yet implemented")
}
func (fsm *StoreFSM) applyUpdateRetentionPolicyCommand(cmd *internal.Command) interface{} {
panic("not yet implemented")
}
func (fsm *StoreFSM) applyCreateShardGroupIfNotExistsCommand(cmd *internal.Command) interface{} {
panic("not yet implemented")
}
func (fsm *StoreFSM) applyCreateUserCommand(cmd *internal.Command) interface{} {
panic("not yet implemented")
}
func (fsm *StoreFSM) applyDeleteUserCommand(cmd *internal.Command) interface{} {
panic("not yet implemented")
}
func (fsm *StoreFSM) applyUpdateUserCommand(cmd *internal.Command) interface{} {
panic("not yet implemented")
}
func (fsm *StoreFSM) applySetPrivilegeCommand(cmd *internal.Command) interface{} {
panic("not yet implemented")
}
func (fsm *StoreFSM) applyDeleteShardGroupCommand(cmd *internal.Command) interface{} {
panic("not yet implemented")
}
func (fsm *StoreFSM) Snapshot() (raft.FSMSnapshot, error) {
func (fsm *storeFSM) Snapshot() (raft.FSMSnapshot, error) {
s := (*Store)(fsm)
s.mu.Lock()
defer s.mu.Unlock()
return &StoreFSMSnapshot{Data: (*Store)(fsm).data}, nil
return &storeFSMSnapshot{Data: (*Store)(fsm).data}, nil
}
func (fsm *StoreFSM) Restore(r io.ReadCloser) error {
func (fsm *storeFSM) Restore(r io.ReadCloser) error {
// Read all bytes.
// b, err := ioutil.ReadAll(r)
// if err != nil {
@ -441,11 +867,11 @@ func (fsm *StoreFSM) Restore(r io.ReadCloser) error {
return nil
}
type StoreFSMSnapshot struct {
type storeFSMSnapshot struct {
Data *Data
}
func (s *StoreFSMSnapshot) Persist(sink raft.SnapshotSink) error {
func (s *storeFSMSnapshot) Persist(sink raft.SnapshotSink) error {
// TODO: Encode data.
// TODO: sink.Write(p)
// TODO: sink.Close()
@ -453,13 +879,36 @@ func (s *StoreFSMSnapshot) Persist(sink raft.SnapshotSink) error {
}
// Release is invoked when we are finished with the snapshot
func (s *StoreFSMSnapshot) Release() {}
func (s *storeFSMSnapshot) Release() {}
// RetentionPolicyUpdate represents retention policy fields to be updated.
type RetentionPolicyUpdate struct {
Name *string
Duration *time.Duration
ReplicaN *uint32
ReplicaN *int
}
func (rpu *RetentionPolicyUpdate) SetName(v string) { rpu.Name = &v }
func (rpu *RetentionPolicyUpdate) SetDuration(v time.Duration) { rpu.Duration = &v }
func (rpu *RetentionPolicyUpdate) SetReplicaN(v int) { rpu.ReplicaN = &v }
// BcryptCost is the cost associated with generating password with Bcrypt.
// This setting is lowered during testing to improve test suite performance.
var BcryptCost = 10
// HashPassword generates a cryptographically secure hash for password.
// Returns an error if the password is invalid or a hash cannot be generated.
func HashPassword(password string) ([]byte, error) {
// The second arg is the cost of the hashing, higher is slower but makes
// it harder to brute force, since it will be really slow and impractical
return bcrypt.GenerateFromPassword([]byte(password), BcryptCost)
}
// assert will panic with a given formatted message if the given condition is false.
func assert(condition bool, msg string, v ...interface{}) {
if !condition {
panic(fmt.Sprintf("assert failed: "+msg, v...))
}
}
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }

View File

@ -6,6 +6,7 @@ import (
"io/ioutil"
"log"
"os"
"reflect"
"testing"
"time"
@ -39,18 +40,499 @@ func TestStore_Open(t *testing.T) {
}
}
// Ensure the store returns an error
func TestStore_Open_ErrStoreOpen(t *testing.T) {
s := MustOpenStore()
defer s.Close()
if err := s.Open(s.Path()); err != meta.ErrStoreOpen {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure the store can create a new node.
func TestStore_CreateNode(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
ni, err := s.CreateNode("host0")
if err != nil {
// Create node.
if ni, err := s.CreateNode("host0"); err != nil {
t.Fatal(err)
} else if *ni != (meta.NodeInfo{ID: 1, Host: "host0"}) {
t.Fatalf("unexpected node: %#v", ni)
}
// Create another node.
if ni, err := s.CreateNode("host1"); err != nil {
t.Fatal(err)
} else if *ni != (meta.NodeInfo{ID: 2, Host: "host1"}) {
t.Fatalf("unexpected node: %#v", ni)
}
}
// Ensure that creating an existing node returns an error.
func TestStore_CreateNode_ErrNodeExists(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create node.
if _, err := s.CreateNode("host0"); err != nil {
t.Fatal(err)
}
// Create it again.
if _, err := s.CreateNode("host0"); err != meta.ErrNodeExists {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure the store can find a node by ID.
func TestStore_Node(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create nodes.
for i := 0; i < 3; i++ {
if _, err := s.CreateNode(fmt.Sprintf("host%d", i)); err != nil {
t.Fatal(err)
}
}
// Find second node.
if ni, err := s.Node(2); err != nil {
t.Fatal(err)
} else if *ni != (meta.NodeInfo{ID: 2, Host: "host1"}) {
t.Fatalf("unexpected node: %#v", ni)
}
}
// Ensure the store can find a node by host.
func TestStore_NodeByHost(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create nodes.
for i := 0; i < 3; i++ {
if _, err := s.CreateNode(fmt.Sprintf("host%d", i)); err != nil {
t.Fatal(err)
}
}
// Find second node.
if ni, err := s.NodeByHost("host1"); err != nil {
t.Fatal(err)
} else if *ni != (meta.NodeInfo{ID: 2, Host: "host1"}) {
t.Fatalf("unexpected node: %#v", ni)
}
}
// Ensure the store can delete an existing node.
func TestStore_DeleteNode(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create nodes.
for i := 0; i < 3; i++ {
if _, err := s.CreateNode(fmt.Sprintf("host%d", i)); err != nil {
t.Fatal(err)
}
}
// Remove second node.
if err := s.DeleteNode(2); err != nil {
t.Fatal(err)
}
// Ensure remaining nodes are correct.
if ni, _ := s.Node(1); *ni != (meta.NodeInfo{ID: 1, Host: "host0"}) {
t.Fatalf("unexpected node(1): %#v", ni)
}
if ni, _ := s.Node(2); ni != nil {
t.Fatalf("unexpected node(2): %#v", ni)
}
if ni, _ := s.Node(3); *ni != (meta.NodeInfo{ID: 3, Host: "host2"}) {
t.Fatalf("unexpected node(3): %#v", ni)
}
}
// Ensure the store returns an error when deleting a node that doesn't exist.
func TestStore_DeleteNode_ErrNodeNotFound(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
if err := s.DeleteNode(2); err != meta.ErrNodeNotFound {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure the store can create a new database.
func TestStore_CreateDatabase(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create database.
if di, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(di, &meta.DatabaseInfo{Name: "db0"}) {
t.Fatalf("unexpected database: %#v", di)
}
// Create another database.
if di, err := s.CreateDatabase("db1"); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(di, &meta.DatabaseInfo{Name: "db1"}) {
t.Fatalf("unexpected database: %#v", di)
}
}
// Ensure the store can delete an existing database.
func TestStore_DropDatabase(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create databases.
for i := 0; i < 3; i++ {
if _, err := s.CreateDatabase(fmt.Sprintf("db%d", i)); err != nil {
t.Fatal(err)
}
}
// Remove a database.
if err := s.DropDatabase("db1"); err != nil {
t.Fatal(err)
}
// Ensure remaining nodes are correct.
if di, _ := s.Database("db0"); !reflect.DeepEqual(di, &meta.DatabaseInfo{Name: "db0"}) {
t.Fatalf("unexpected database(0): %#v", di)
}
if di, _ := s.Database("db1"); di != nil {
t.Fatalf("unexpected database(1): %#v", di)
}
if di, _ := s.Database("db2"); !reflect.DeepEqual(di, &meta.DatabaseInfo{Name: "db2"}) {
t.Fatalf("unexpected database(2): %#v", di)
}
}
// Ensure the store returns an error when dropping a database that doesn't exist.
func TestStore_DropDatabase_ErrDatabaseNotFound(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
if err := s.DropDatabase("no_such_database"); err != meta.ErrDatabaseNotFound {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure the store can create a retention policy on a database.
func TestStore_CreateRetentionPolicy(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create database.
if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}
// Create policy on database.
if rpi, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{
Name: "rp0",
ReplicaN: 2,
Duration: 48 * time.Hour,
}); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{
Name: "rp0",
ReplicaN: 2,
Duration: 48 * time.Hour,
ShardGroupDuration: 24 * time.Hour,
}) {
t.Fatalf("unexpected policy: %#v", rpi)
}
}
// Ensure the store can delete a retention policy.
func TestStore_DropRetentionPolicy(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create database.
if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}
// Create policies.
for i := 0; i < 3; i++ {
if _, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: fmt.Sprintf("rp%d", i)}); err != nil {
t.Fatal(err)
}
}
// Remove a policy.
if err := s.DropRetentionPolicy("db0", "rp1"); err != nil {
t.Fatal(err)
}
// Ensure remaining policies are correct.
if rpi, _ := s.RetentionPolicy("db0", "rp0"); !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{Name: "rp0", ShardGroupDuration: 7 * 24 * time.Hour}) {
t.Fatalf("unexpected policy(0): %#v", rpi)
}
if rpi, _ := s.RetentionPolicy("db0", "rp1"); rpi != nil {
t.Fatalf("unexpected policy(1): %#v", rpi)
}
if rpi, _ := s.RetentionPolicy("db0", "rp2"); !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{Name: "rp2", ShardGroupDuration: 7 * 24 * time.Hour}) {
t.Fatalf("unexpected policy(2): %#v", rpi)
}
}
// Ensure the store can set the default retention policy on a database.
func TestStore_SetDefaultRetentionPolicy(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create database.
if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if _, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil {
t.Fatal(err)
}
// Set default policy.
if err := s.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}
// Ensure default policy is set.
if di, _ := s.Database("db0"); di.DefaultRetentionPolicy != "rp0" {
t.Fatalf("unexpected default retention policy: %s", di.DefaultRetentionPolicy)
}
}
// Ensure the store can update a retention policy.
func TestStore_UpdateRetentionPolicy(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create database.
if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if _, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil {
t.Fatal(err)
}
// Update policy.
var rpu meta.RetentionPolicyUpdate
rpu.SetName("rp1")
rpu.SetDuration(10 * time.Hour)
rpu.SetReplicaN(3)
if err := s.UpdateRetentionPolicy("db0", "rp0", &rpu); err != nil {
t.Fatal(err)
}
// Ensure policy is updated.
if rpi, err := s.RetentionPolicy("db0", "rp1"); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{
Name: "rp1",
Duration: 10 * time.Hour,
ShardGroupDuration: 7 * 24 * time.Hour,
ReplicaN: 3,
}) {
t.Fatalf("unexpected policy: %#v", rpi)
}
}
// Ensure the store can create a shard group on a retention policy.
func TestStore_CreateShardGroup(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create node & database.
if _, err := s.CreateNode("host0"); err != nil {
t.Fatal(err)
} else if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1, Duration: 1 * time.Hour}); err != nil {
t.Fatal(err)
}
// Create policy on database.
if sgi, err := s.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
} else if sgi.ID != 1 {
t.Fatalf("unexpected shard group: %#v", sgi)
}
}
// Ensure the store can delete an existing shard group.
func TestStore_DeleteShardGroup(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create node, database, policy, & group.
if _, err := s.CreateNode("host0"); err != nil {
t.Fatal(err)
} else if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1, Duration: 1 * time.Hour}); err != nil {
t.Fatal(err)
} else if _, err := s.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
}
// Remove policy from database.
if err := s.DeleteShardGroup("db0", "rp0", 1); err != nil {
t.Fatal(err)
}
}
// Ensure the store can create a new continuous query.
func TestStore_CreateContinuousQuery(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create query.
if err := s.CreateContinuousQuery("SELECT count() FROM foo"); err != nil {
t.Fatal(err)
}
}
// Ensure that creating an existing continuous query returns an error.
func TestStore_CreateContinuousQuery_ErrContinuousQueryExists(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create continuous query.
if err := s.CreateContinuousQuery("SELECT count() FROM foo"); err != nil {
t.Fatal(err)
}
// Create it again.
if err := s.CreateContinuousQuery("SELECT count() FROM foo"); err != meta.ErrContinuousQueryExists {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure the store can delete a continuous query.
func TestStore_DropContinuousQuery(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create queries.
if err := s.CreateContinuousQuery("SELECT count() FROM foo"); err != nil {
t.Fatal(err)
} else if err = s.CreateContinuousQuery("SELECT count() FROM bar"); err != nil {
t.Fatal(err)
} else if err = s.CreateContinuousQuery("SELECT count() FROM baz"); err != nil {
t.Fatal(err)
}
// Remove one of the queries.
if err := s.DropContinuousQuery("SELECT count() FROM bar"); err != nil {
t.Fatal(err)
}
// Ensure the resulting queries are correct.
if a, err := s.ContinuousQueries(); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(a, []meta.ContinuousQueryInfo{
{Query: "SELECT count() FROM foo"},
{Query: "SELECT count() FROM baz"},
}) {
t.Fatalf("unexpected queries: %#v", a)
}
}
// Ensure the store can create a user.
func TestStore_CreateUser(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create user.
if ui, err := s.CreateUser("susy", "pass", true); err != nil {
t.Fatal(err)
} else if ui.Name != "susy" || ui.Hash == "" || ui.Admin != true {
t.Fatalf("unexpected user: %#v", ui)
}
}
// Ensure the store can remove a user.
func TestStore_DropUser(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create users.
if _, err := s.CreateUser("susy", "pass", true); err != nil {
t.Fatal(err)
} else if _, err := s.CreateUser("bob", "pass", true); err != nil {
t.Fatal(err)
}
// Remove user.
if err := s.DropUser("bob"); err != nil {
t.Fatal(err)
}
// Verify user was removed.
if a, err := s.Users(); err != nil {
t.Fatal(err)
} else if len(a) != 1 {
t.Fatalf("unexpected user count: %d", len(a))
} else if a[0].Name != "susy" {
t.Fatalf("unexpected user: %s", a[0].Name)
}
}
// Ensure the store can update a user.
func TestStore_UpdateUser(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create users.
if _, err := s.CreateUser("susy", "pass", true); err != nil {
t.Fatal(err)
} else if _, err := s.CreateUser("bob", "pass", true); err != nil {
t.Fatal(err)
}
// Store password hash for bob.
ui, err := s.User("bob")
if err != nil {
t.Fatal(err)
}
// Update user.
if err := s.UpdateUser("bob", "XXX"); err != nil {
t.Fatal(err)
}
// Verify password hash was updated.
if other, err := s.User("bob"); err != nil {
t.Fatal(err)
} else if ui.Hash == other.Hash {
t.Fatal("password hash did not change")
}
}
// Store is a test wrapper for meta.Store.