diff --git a/meta/data.go b/meta/data.go index a07d680bd5..d745163bb6 100644 --- a/meta/data.go +++ b/meta/data.go @@ -1,7 +1,6 @@ package meta import ( - "errors" "time" "github.com/gogo/protobuf/proto" @@ -11,35 +10,27 @@ import ( //go:generate protoc --gogo_out=. internal/meta.proto -var ( - // ErrNodeExists is returned when creating a node that already exists. - ErrNodeExists = errors.New("node already exists") -) - // Data represents the top level collection of all metadata. type Data struct { - Version uint64 // autoincrementing version - MaxNodeID uint64 - Nodes []NodeInfo - Databases []DatabaseInfo - Users []UserInfo + Version uint64 // autoincrementing version + Nodes []NodeInfo + Databases []DatabaseInfo + Users []UserInfo + ContinuousQueries []ContinuousQueryInfo + + MaxNodeID uint64 + MaxShardGroupID uint64 + MaxShardID uint64 } -// CreateNode returns a new instance of Data with a new node. -func (data *Data) CreateNode(host string) (*Data, error) { - if data.NodeByHost(host) != nil { - return nil, ErrNodeExists +// Node returns a node by id. +func (data *Data) Node(id uint64) *NodeInfo { + for i := range data.Nodes { + if data.Nodes[i].ID == id { + return &data.Nodes[i] + } } - - // Clone and append new node. - other := data.Clone() - other.MaxNodeID++ - other.Nodes = append(other.Nodes, NodeInfo{ - ID: other.MaxNodeID, - Host: host, - }) - - return other, nil + return nil } // NodeByHost returns a node by hostname. @@ -52,29 +43,395 @@ func (data *Data) NodeByHost(host string) *NodeInfo { return nil } +// CreateNode adds a node to the metadata. +func (data *Data) CreateNode(host string) error { + // Ensure a node with the same host doesn't already exist. + if data.NodeByHost(host) != nil { + return ErrNodeExists + } + + // Append new node. + data.MaxNodeID++ + data.Nodes = append(data.Nodes, NodeInfo{ + ID: data.MaxNodeID, + Host: host, + }) + + return nil +} + +// DeleteNode removes a node from the metadata. +func (data *Data) DeleteNode(id uint64) error { + for i := range data.Nodes { + if data.Nodes[i].ID == id { + data.Nodes = append(data.Nodes[:i], data.Nodes[i+1:]...) + return nil + } + } + return ErrNodeNotFound +} + +// Database returns a database by name. +func (data *Data) Database(name string) *DatabaseInfo { + for i := range data.Databases { + if data.Databases[i].Name == name { + return &data.Databases[i] + } + } + return nil +} + +// CreateDatabase creates a new database. +// Returns an error if name is blank or if a database with the same name already exists. +func (data *Data) CreateDatabase(name string) error { + if name == "" { + return ErrDatabaseNameRequired + } else if data.Database(name) != nil { + return ErrDatabaseExists + } + + // Append new node. + data.Databases = append(data.Databases, DatabaseInfo{Name: name}) + + return nil +} + +// DropDatabase removes a database by name. +func (data *Data) DropDatabase(name string) error { + for i := range data.Databases { + if data.Databases[i].Name == name { + data.Databases = append(data.Databases[:i], data.Databases[i+1:]...) + return nil + } + } + return ErrDatabaseNotFound +} + +// RetentionPolicy returns a retention policy for a database by name. +func (data *Data) RetentionPolicy(database, name string) (*RetentionPolicyInfo, error) { + di := data.Database(database) + if di == nil { + return nil, ErrDatabaseNotFound + } + + for i := range di.RetentionPolicies { + if di.RetentionPolicies[i].Name == name { + return &di.RetentionPolicies[i], nil + } + } + return nil, nil +} + +// CreateRetentionPolicy creates a new retention policy on a database. +// Returns an error if name is blank or if a database does not exist. +func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo) error { + // Validate retention policy. + if rpi.Name == "" { + return ErrRetentionPolicyNameRequired + } + + // Find database. + di := data.Database(database) + if di == nil { + return ErrDatabaseNotFound + } else if di.RetentionPolicy(rpi.Name) != nil { + return ErrRetentionPolicyExists + } + + // Append new policy. + di.RetentionPolicies = append(di.RetentionPolicies, RetentionPolicyInfo{ + Name: rpi.Name, + Duration: rpi.Duration, + ShardGroupDuration: shardGroupDuration(rpi.Duration), + ReplicaN: rpi.ReplicaN, + }) + + return nil +} + +// DropRetentionPolicy removes a retention policy from a database by name. +func (data *Data) DropRetentionPolicy(database, name string) error { + // Find database. + di := data.Database(database) + if di == nil { + return ErrDatabaseNotFound + } + + // Remove from list. + for i := range di.RetentionPolicies { + if di.RetentionPolicies[i].Name == name { + di.RetentionPolicies = append(di.RetentionPolicies[:i], di.RetentionPolicies[i+1:]...) + return nil + } + } + return ErrRetentionPolicyNotFound +} + +// UpdateRetentionPolicy updates an existing retention policy. +func (data *Data) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error { + // Find database. + di := data.Database(database) + if di == nil { + return ErrDatabaseNotFound + } + + // Find policy. + rpi := di.RetentionPolicy(name) + if rpi == nil { + return ErrRetentionPolicyNotFound + } + + // Ensure new policy doesn't match an existing policy. + if rpu.Name != nil && *rpu.Name != name && di.RetentionPolicy(*rpu.Name) != nil { + return ErrRetentionPolicyNameExists + } + + // Update fields. + if rpu.Name != nil { + rpi.Name = *rpu.Name + } + if rpu.Duration != nil { + rpi.Duration = *rpu.Duration + } + if rpu.ReplicaN != nil { + rpi.ReplicaN = *rpu.ReplicaN + } + + return nil +} + +// SetDefaultRetentionPolicy sets the default retention policy for a database. +func (data *Data) SetDefaultRetentionPolicy(database, name string) error { + // Find database and verify policy exists. + di := data.Database(database) + if di == nil { + return ErrDatabaseNotFound + } else if di.RetentionPolicy(name) == nil { + return ErrRetentionPolicyNotFound + } + + // Set default policy. + di.DefaultRetentionPolicy = name + + return nil +} + +// ShardGroupByTimestamp returns the shard group on a database and policy for a given timestamp. +func (data *Data) ShardGroupByTimestamp(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) { + // Find retention policy. + rpi, err := data.RetentionPolicy(database, policy) + if err != nil { + return nil, err + } else if rpi == nil { + return nil, ErrRetentionPolicyNotFound + } + + return rpi.ShardGroupByTimestamp(timestamp), nil +} + +// CreateShardGroup creates a shard group on a database and policy for a given timestamp. +func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) error { + // Ensure there are nodes in the metadata. + if len(data.Nodes) == 0 { + return ErrNodesRequired + } + + // Find retention policy. + rpi, err := data.RetentionPolicy(database, policy) + if err != nil { + return err + } else if rpi == nil { + return ErrRetentionPolicyNotFound + } + + // Verify that shard group doesn't already exist for this timestamp. + if rpi.ShardGroupByTimestamp(timestamp) != nil { + return ErrShardGroupExists + } + + // Require at least one replica but no more replicas than nodes. + replicaN := rpi.ReplicaN + if replicaN == 0 { + replicaN = 1 + } else if replicaN > len(data.Nodes) { + replicaN = len(data.Nodes) + } + + // Determine shard count by node count divided by replication factor. + // This will ensure nodes will get distributed across nodes evenly and + // replicated the correct number of times. + shardN := len(data.Nodes) / replicaN + + // Create the shard group. + data.MaxShardGroupID++ + sgi := ShardGroupInfo{} + sgi.ID = data.MaxShardGroupID + sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC() + sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC() + + // Create shards on the group. + sgi.Shards = make([]ShardInfo, shardN) + for i := range sgi.Shards { + data.MaxShardID++ + sgi.Shards[i] = ShardInfo{ID: data.MaxShardID} + } + + // Assign data nodes to shards via round robin. + // Start from a repeatably "random" place in the node list. + nodeIndex := int(data.Version % uint64(len(data.Nodes))) + for i := range sgi.Shards { + si := &sgi.Shards[i] + for j := 0; j < replicaN; j++ { + nodeID := data.Nodes[nodeIndex%len(data.Nodes)].ID + si.OwnerIDs = append(si.OwnerIDs, nodeID) + nodeIndex++ + } + } + + // Retention policy has a new shard group, so update the policy. + rpi.ShardGroups = append(rpi.ShardGroups, sgi) + + return nil +} + +// DeleteShardGroup removes a shard group from a database and retention policy by id. +func (data *Data) DeleteShardGroup(database, policy string, id uint64) error { + // Find retention policy. + rpi, err := data.RetentionPolicy(database, policy) + if err != nil { + return err + } else if rpi == nil { + return ErrRetentionPolicyNotFound + } + + // Find shard group by ID and remove it. + for i := range rpi.ShardGroups { + if rpi.ShardGroups[i].ID == id { + rpi.ShardGroups = append(rpi.ShardGroups[:i], rpi.ShardGroups[i+1:]...) + return nil + } + } + + return ErrShardGroupNotFound +} + +// CreateContinuousQuery adds a continuous query. +func (data *Data) CreateContinuousQuery(query string) error { + // Ensure the query doesn't already exist. + for i := range data.ContinuousQueries { + if data.ContinuousQueries[i].Query == query { + return ErrContinuousQueryExists + } + } + + // Append new query. + data.ContinuousQueries = append(data.ContinuousQueries, ContinuousQueryInfo{ + Query: query, + }) + + return nil +} + +// DropContinuousQuery removes a continuous query. +func (data *Data) DropContinuousQuery(query string) error { + for i := range data.ContinuousQueries { + if data.ContinuousQueries[i].Query == query { + data.ContinuousQueries = append(data.ContinuousQueries[:i], data.ContinuousQueries[i+1:]...) + return nil + } + } + return ErrContinuousQueryNotFound +} + +// User returns a user by username. +func (data *Data) User(username string) *UserInfo { + for i := range data.Users { + if data.Users[i].Name == username { + return &data.Users[i] + } + } + return nil +} + +// CreateUser creates a new user. +func (data *Data) CreateUser(name, hash string, admin bool) error { + // Ensure the user doesn't already exist. + if name == "" { + return ErrUsernameRequired + } else if data.User(name) != nil { + return ErrUserExists + } + + // Append new user. + data.Users = append(data.Users, UserInfo{ + Name: name, + Hash: hash, + Admin: admin, + }) + + return nil +} + +// DropUser removes an existing user by name. +func (data *Data) DropUser(name string) error { + for i := range data.Users { + if data.Users[i].Name == name { + data.Users = append(data.Users[:i], data.Users[i+1:]...) + return nil + } + } + return ErrUserNotFound +} + +// UpdateUser updates the password hash of an existing user. +func (data *Data) UpdateUser(name, hash string) error { + for i := range data.Users { + if data.Users[i].Name == name { + data.Users[i].Hash = hash + return nil + } + } + return ErrUserNotFound +} + // Clone returns a copy of data with a new version. func (data *Data) Clone() *Data { - other := &Data{Version: data.Version + 1} + other := *data + other.Version++ // Copy nodes. - other.Nodes = make([]NodeInfo, len(data.Nodes)) - for i := range data.Nodes { - other.Nodes[i] = data.Nodes[i].Clone() + if data.Nodes != nil { + other.Nodes = make([]NodeInfo, len(data.Nodes)) + for i := range data.Nodes { + other.Nodes[i] = data.Nodes[i].clone() + } } // Deep copy databases. - other.Databases = make([]DatabaseInfo, len(data.Databases)) - for i := range data.Databases { - other.Databases[i] = data.Databases[i].Clone() + if data.Databases != nil { + other.Databases = make([]DatabaseInfo, len(data.Databases)) + for i := range data.Databases { + other.Databases[i] = data.Databases[i].clone() + } + } + + // Copy continuous queries. + if data.ContinuousQueries != nil { + other.ContinuousQueries = make([]ContinuousQueryInfo, len(data.ContinuousQueries)) + for i := range data.ContinuousQueries { + other.ContinuousQueries[i] = data.ContinuousQueries[i].clone() + } } // Copy users. - other.Users = make([]UserInfo, len(data.Users)) - for i := range data.Users { - other.Users[i] = data.Users[i].Clone() + if data.Users != nil { + other.Users = make([]UserInfo, len(data.Users)) + for i := range data.Users { + other.Users[i] = data.Users[i].clone() + } } - return other + return &other } // NodeInfo represents information about a single node in the cluster. @@ -83,8 +440,8 @@ type NodeInfo struct { Host string } -// Clone returns a deep copy of ni. -func (ni NodeInfo) Clone() NodeInfo { return ni } +// clone returns a deep copy of ni. +func (ni NodeInfo) clone() NodeInfo { return ni } // MarshalBinary encodes the object to a binary format. func (info *NodeInfo) MarshalBinary() ([]byte, error) { @@ -109,32 +466,33 @@ func (info *NodeInfo) UnmarshalBinary(buf []byte) error { type DatabaseInfo struct { Name string DefaultRetentionPolicy string - Policies []RetentionPolicyInfo - ContinuousQueries []ContinuousQueryInfo + RetentionPolicies []RetentionPolicyInfo } -// Clone returns a deep copy of di. -func (di DatabaseInfo) Clone() DatabaseInfo { +// RetentionPolicy returns a retention policy by name. +func (di DatabaseInfo) RetentionPolicy(name string) *RetentionPolicyInfo { + for i := range di.RetentionPolicies { + if di.RetentionPolicies[i].Name == name { + return &di.RetentionPolicies[i] + } + } + return nil +} + +// clone returns a deep copy of di. +func (di DatabaseInfo) clone() DatabaseInfo { other := di - other.Policies = make([]RetentionPolicyInfo, len(di.Policies)) - for i := range di.Policies { - other.Policies[i] = di.Policies[i].Clone() - } - - other.ContinuousQueries = make([]ContinuousQueryInfo, len(di.ContinuousQueries)) - for i := range di.ContinuousQueries { - other.ContinuousQueries[i] = di.ContinuousQueries[i].Clone() + if di.RetentionPolicies != nil { + other.RetentionPolicies = make([]RetentionPolicyInfo, len(di.RetentionPolicies)) + for i := range di.RetentionPolicies { + other.RetentionPolicies[i] = di.RetentionPolicies[i].clone() + } } return other } -// RetentionPolicy returns a policy on the database by name. -func (db *DatabaseInfo) RetentionPolicy(name string) *RetentionPolicyInfo { - panic("not yet implemented") -} - // RetentionPolicyInfo represents metadata about a retention policy. type RetentionPolicyInfo struct { Name string @@ -144,18 +502,50 @@ type RetentionPolicyInfo struct { ShardGroups []ShardGroupInfo } -// Clone returns a deep copy of rpi. -func (rpi RetentionPolicyInfo) Clone() RetentionPolicyInfo { +// ShardGroupByTimestamp returns the shard group in the policy that contains the timestamp. +func (rpi *RetentionPolicyInfo) ShardGroupByTimestamp(timestamp time.Time) *ShardGroupInfo { + for i := range rpi.ShardGroups { + if rpi.ShardGroups[i].Contains(timestamp) { + return &rpi.ShardGroups[i] + } + } + return nil +} + +// protobuf returns a protocol buffers object. +func (rpi *RetentionPolicyInfo) protobuf() *internal.RetentionPolicyInfo { + return &internal.RetentionPolicyInfo{ + Name: proto.String(rpi.Name), + ReplicaN: proto.Uint32(uint32(rpi.ReplicaN)), + Duration: proto.Int64(int64(rpi.Duration)), + ShardGroupDuration: proto.Int64(int64(rpi.ShardGroupDuration)), + } +} + +// clone returns a deep copy of rpi. +func (rpi RetentionPolicyInfo) clone() RetentionPolicyInfo { other := rpi - other.ShardGroups = make([]ShardGroupInfo, len(rpi.ShardGroups)) - for i := range rpi.ShardGroups { - other.ShardGroups[i] = rpi.ShardGroups[i].Clone() + if rpi.ShardGroups != nil { + other.ShardGroups = make([]ShardGroupInfo, len(rpi.ShardGroups)) + for i := range rpi.ShardGroups { + other.ShardGroups[i] = rpi.ShardGroups[i].clone() + } } return other } +// shardGroupDuration returns the duration for a shard group based on a policy duration. +func shardGroupDuration(d time.Duration) time.Duration { + if d >= 180*24*time.Hour || d == 0 { // 6 months or 0 + return 7 * 24 * time.Hour + } else if d >= 2*24*time.Hour { // 2 days + return 1 * 24 * time.Hour + } + return 1 * time.Hour +} + // ShardGroupInfo represents metadata about a shard group. type ShardGroupInfo struct { ID uint64 @@ -164,13 +554,20 @@ type ShardGroupInfo struct { Shards []ShardInfo } -// Clone returns a deep copy of sgi. -func (sgi ShardGroupInfo) Clone() ShardGroupInfo { +// Contains return true if the shard group contains data for the timestamp. +func (sgi *ShardGroupInfo) Contains(timestamp time.Time) bool { + return !sgi.StartTime.After(timestamp) && sgi.EndTime.After(timestamp) +} + +// clone returns a deep copy of sgi. +func (sgi ShardGroupInfo) clone() ShardGroupInfo { other := sgi - other.Shards = make([]ShardInfo, len(sgi.Shards)) - for i := range sgi.Shards { - other.Shards[i] = sgi.Shards[i].Clone() + if sgi.Shards != nil { + other.Shards = make([]ShardInfo, len(sgi.Shards)) + for i := range sgi.Shards { + other.Shards[i] = sgi.Shards[i].clone() + } } return other @@ -187,12 +584,14 @@ type ShardInfo struct { OwnerIDs []uint64 } -// Clone returns a deep copy of si. -func (si ShardInfo) Clone() ShardInfo { +// clone returns a deep copy of si. +func (si ShardInfo) clone() ShardInfo { other := si - other.OwnerIDs = make([]uint64, len(si.OwnerIDs)) - copy(other.OwnerIDs, si.OwnerIDs) + if si.OwnerIDs != nil { + other.OwnerIDs = make([]uint64, len(si.OwnerIDs)) + copy(other.OwnerIDs, si.OwnerIDs) + } return other } @@ -202,8 +601,8 @@ type ContinuousQueryInfo struct { Query string } -// Clone returns a deep copy of cqi. -func (cqi ContinuousQueryInfo) Clone() ContinuousQueryInfo { return cqi } +// clone returns a deep copy of cqi. +func (cqi ContinuousQueryInfo) clone() ContinuousQueryInfo { return cqi } // UserInfo represents metadata about a user in the system. type UserInfo struct { @@ -213,13 +612,15 @@ type UserInfo struct { Privileges map[string]influxql.Privilege } -// Clone returns a deep copy of si. -func (ui UserInfo) Clone() UserInfo { +// clone returns a deep copy of si. +func (ui UserInfo) clone() UserInfo { other := ui - other.Privileges = make(map[string]influxql.Privilege) - for k, v := range ui.Privileges { - other.Privileges[k] = v + if ui.Privileges != nil { + other.Privileges = make(map[string]influxql.Privilege) + for k, v := range ui.Privileges { + other.Privileges[k] = v + } } return other diff --git a/meta/data_test.go b/meta/data_test.go index 64ac75f027..9ceb621a44 100644 --- a/meta/data_test.go +++ b/meta/data_test.go @@ -1,6 +1,9 @@ package meta_test +// import "github.com/davecgh/go-spew/spew" + import ( + "fmt" "reflect" "testing" "time" @@ -9,6 +12,421 @@ import ( "github.com/influxdb/influxdb/meta" ) +// Ensure a node can be created. +func TestData_CreateNode(t *testing.T) { + var data meta.Data + if err := data.CreateNode("host0"); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(data.Nodes, []meta.NodeInfo{{ID: 1, Host: "host0"}}) { + t.Fatalf("unexpected node: %#v", data.Nodes[0]) + } +} + +// Ensure a node can be removed. +func TestData_DeleteNode(t *testing.T) { + var data meta.Data + if err := data.CreateNode("host0"); err != nil { + t.Fatal(err) + } else if err = data.CreateNode("host1"); err != nil { + t.Fatal(err) + } else if err := data.CreateNode("host2"); err != nil { + t.Fatal(err) + } + + if err := data.DeleteNode(1); err != nil { + t.Fatal(err) + } else if len(data.Nodes) != 2 { + t.Fatalf("unexpected node count: %d", len(data.Nodes)) + } else if data.Nodes[0] != (meta.NodeInfo{ID: 2, Host: "host1"}) { + t.Fatalf("unexpected node: %#v", data.Nodes[0]) + } else if data.Nodes[1] != (meta.NodeInfo{ID: 3, Host: "host2"}) { + t.Fatalf("unexpected node: %#v", data.Nodes[1]) + } +} + +// Ensure a database can be created. +func TestData_CreateDatabase(t *testing.T) { + var data meta.Data + if err := data.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(data.Databases, []meta.DatabaseInfo{{Name: "db0"}}) { + t.Fatalf("unexpected databases: %#v", data.Databases) + } +} + +// Ensure that creating a database without a name returns an error. +func TestData_CreateDatabase_ErrNameRequired(t *testing.T) { + var data meta.Data + if err := data.CreateDatabase(""); err != meta.ErrDatabaseNameRequired { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure that creating an already existing database returns an error. +func TestData_CreateDatabase_ErrDatabaseExists(t *testing.T) { + var data meta.Data + if err := data.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } + if err := data.CreateDatabase("db0"); err != meta.ErrDatabaseExists { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure a database can be removed. +func TestData_DropDatabase(t *testing.T) { + var data meta.Data + for i := 0; i < 3; i++ { + if err := data.CreateDatabase(fmt.Sprintf("db%d", i)); err != nil { + t.Fatal(err) + } + } + + if err := data.DropDatabase("db1"); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(data.Databases, []meta.DatabaseInfo{{Name: "db0"}, {Name: "db2"}}) { + t.Fatalf("unexpected databases: %#v", data.Databases) + } +} + +// Ensure a retention policy can be created. +func TestData_CreateRetentionPolicy(t *testing.T) { + var data meta.Data + if err := data.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } + + // Create policy. + if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{ + Name: "rp0", + ReplicaN: 2, + Duration: 4 * time.Hour, + }); err != nil { + t.Fatal(err) + } + + // Verify policy exists. + if !reflect.DeepEqual(data.Databases[0].RetentionPolicies, []meta.RetentionPolicyInfo{ + { + Name: "rp0", + ReplicaN: 2, + Duration: 4 * time.Hour, + ShardGroupDuration: 1 * time.Hour, + }, + }) { + t.Fatalf("unexpected policies: %#v", data.Databases[0].RetentionPolicies) + } +} + +// Ensure that creating a policy without a name returns an error. +func TestData_CreateRetentionPolicy_ErrNameRequired(t *testing.T) { + var data meta.Data + if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: ""}); err != meta.ErrRetentionPolicyNameRequired { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure that creating a retention policy on a non-existent database returns an error. +func TestData_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) { + var data meta.Data + if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != meta.ErrDatabaseNotFound { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure that creating an already existing policy returns an error. +func TestData_CreateRetentionPolicy_ErrRetentionPolicyExists(t *testing.T) { + var data meta.Data + if err := data.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil { + t.Fatal(err) + } + if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != meta.ErrRetentionPolicyExists { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure that a retention policy can be updated. +func TestData_UpdateRetentionPolicy(t *testing.T) { + var data meta.Data + if err := data.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil { + t.Fatal(err) + } + + // Update the policy. + var rpu meta.RetentionPolicyUpdate + rpu.SetName("rp1") + rpu.SetDuration(10 * time.Hour) + rpu.SetReplicaN(3) + if err := data.UpdateRetentionPolicy("db0", "rp0", &rpu); err != nil { + t.Fatal(err) + } + + // Verify the policy was changed. + if rpi, _ := data.RetentionPolicy("db0", "rp1"); !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{ + Name: "rp1", + Duration: 10 * time.Hour, + ShardGroupDuration: 604800000000000, + ReplicaN: 3, + }) { + t.Fatalf("unexpected policy: %#v", rpi) + } +} + +// Ensure a retention policy can be removed. +func TestData_DropRetentionPolicy(t *testing.T) { + var data meta.Data + if err := data.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil { + t.Fatal(err) + } + + if err := data.DropRetentionPolicy("db0", "rp0"); err != nil { + t.Fatal(err) + } else if len(data.Databases[0].RetentionPolicies) != 0 { + t.Fatalf("unexpected policies: %#v", data.Databases[0].RetentionPolicies) + } +} + +// Ensure an error is returned when deleting a policy from a non-existent database. +func TestData_DropRetentionPolicy_ErrDatabaseNotFound(t *testing.T) { + var data meta.Data + if err := data.DropRetentionPolicy("db0", "rp0"); err != meta.ErrDatabaseNotFound { + t.Fatal(err) + } +} + +// Ensure an error is returned when deleting a non-existent policy. +func TestData_DropRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) { + var data meta.Data + if err := data.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } + if err := data.DropRetentionPolicy("db0", "rp0"); err != meta.ErrRetentionPolicyNotFound { + t.Fatal(err) + } +} + +// Ensure that a retention policy can be retrieved. +func TestData_RetentionPolicy(t *testing.T) { + var data meta.Data + if err := data.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil { + t.Fatal(err) + } else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp1"}); err != nil { + t.Fatal(err) + } + + if rpi, err := data.RetentionPolicy("db0", "rp0"); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{ + Name: "rp0", + ShardGroupDuration: 604800000000000, + }) { + t.Fatalf("unexpected value: %#v", rpi) + } +} + +// Ensure that retrieveing a policy from a non-existent database returns an error. +func TestData_RetentionPolicy_ErrDatabaseNotFound(t *testing.T) { + var data meta.Data + if _, err := data.RetentionPolicy("db0", "rp0"); err != meta.ErrDatabaseNotFound { + t.Fatal(err) + } +} + +// Ensure that a default retention policy can be set. +func TestData_SetDefaultRetentionPolicy(t *testing.T) { + var data meta.Data + if err := data.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil { + t.Fatal(err) + } + + // Verify there is no default policy on the database initially. + if name := data.Database("db0").DefaultRetentionPolicy; name != "" { + t.Fatalf("unexpected initial default retention policy: %s", name) + } + + // Set the default policy. + if err := data.SetDefaultRetentionPolicy("db0", "rp0"); err != nil { + t.Fatal(err) + } + + // Verify the default policy is now set. + if name := data.Database("db0").DefaultRetentionPolicy; name != "rp0" { + t.Fatalf("unexpected default retention policy: %s", name) + } +} + +// Ensure that a shard group can be created on a database for a given timestamp. +func TestData_CreateShardGroup(t *testing.T) { + var data meta.Data + if err := data.CreateNode("node0"); err != nil { + t.Fatal(err) + } else if err = data.CreateNode("node1"); err != nil { + t.Fatal(err) + } else if err = data.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", Duration: 1 * time.Hour}); err != nil { + t.Fatal(err) + } + + // Create shard group. + if err := data.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil { + t.Fatal(err) + } + + // Verify the shard group was created. + if sgi, _ := data.ShardGroupByTimestamp("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); !reflect.DeepEqual(sgi, &meta.ShardGroupInfo{ + ID: 1, + StartTime: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC), + EndTime: time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC), + Shards: []meta.ShardInfo{ + {ID: 1, OwnerIDs: []uint64{1}}, + {ID: 2, OwnerIDs: []uint64{2}}, + }, + }) { + t.Fatalf("unexpected shard group: %#v", sgi) + } +} + +// Ensure a shard group can be removed by ID. +func TestData_DeleteShardGroup(t *testing.T) { + var data meta.Data + if err := data.CreateNode("node0"); err != nil { + t.Fatal(err) + } else if err := data.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil { + t.Fatal(err) + } else if err := data.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil { + t.Fatal(err) + } + + if err := data.DeleteShardGroup("db0", "rp0", 1); err != nil { + t.Fatal(err) + } else if len(data.Databases[0].RetentionPolicies[0].ShardGroups) != 0 { + t.Fatalf("unexpected shard groups: %#v", data.Databases[0].RetentionPolicies[0].ShardGroups) + } +} + +// Ensure a continuous query can be created. +func TestData_CreateContinuousQuery(t *testing.T) { + var data meta.Data + if err := data.CreateContinuousQuery("SELECT count() FROM foo"); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(data.ContinuousQueries, []meta.ContinuousQueryInfo{{Query: "SELECT count() FROM foo"}}) { + t.Fatalf("unexpected queries: %#v", data.ContinuousQueries) + } +} + +// Ensure a continuous query can be removed. +func TestData_DropContinuousQuery(t *testing.T) { + var data meta.Data + if err := data.CreateContinuousQuery("SELECT count() FROM foo"); err != nil { + t.Fatal(err) + } else if err = data.CreateContinuousQuery("SELECT count() FROM bar"); err != nil { + t.Fatal(err) + } + + if err := data.DropContinuousQuery("SELECT count() FROM foo"); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(data.ContinuousQueries, []meta.ContinuousQueryInfo{ + {Query: "SELECT count() FROM bar"}, + }) { + t.Fatalf("unexpected queries: %#v", data.ContinuousQueries) + } +} + +// Ensure a user can be created. +func TestData_CreateUser(t *testing.T) { + var data meta.Data + if err := data.CreateUser("susy", "ABC123", true); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(data.Users, []meta.UserInfo{ + {Name: "susy", Hash: "ABC123", Admin: true}, + }) { + t.Fatalf("unexpected users: %#v", data.Users) + } +} + +// Ensure that creating a user with no username returns an error. +func TestData_CreateUser_ErrUsernameRequired(t *testing.T) { + var data meta.Data + if err := data.CreateUser("", "", false); err != meta.ErrUsernameRequired { + t.Fatal(err) + } +} + +// Ensure that creating the same user twice returns an error. +func TestData_CreateUser_ErrUserExists(t *testing.T) { + var data meta.Data + if err := data.CreateUser("susy", "", false); err != nil { + t.Fatal(err) + } + if err := data.CreateUser("susy", "", false); err != meta.ErrUserExists { + t.Fatal(err) + } +} + +// Ensure a user can be removed. +func TestData_DropUser(t *testing.T) { + var data meta.Data + if err := data.CreateUser("susy", "", false); err != nil { + t.Fatal(err) + } else if err := data.CreateUser("bob", "", false); err != nil { + t.Fatal(err) + } + + if err := data.DropUser("bob"); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(data.Users, []meta.UserInfo{ + {Name: "susy"}, + }) { + t.Fatalf("unexpected users: %#v", data.Users) + } +} + +// Ensure that removing a non-existent user returns an error. +func TestData_DropUser_ErrUserNotFound(t *testing.T) { + var data meta.Data + if err := data.DropUser("bob"); err != meta.ErrUserNotFound { + t.Fatal(err) + } +} + +// Ensure a user can be updated. +func TestData_UpdateUser(t *testing.T) { + var data meta.Data + if err := data.CreateUser("susy", "", false); err != nil { + t.Fatal(err) + } else if err := data.CreateUser("bob", "", false); err != nil { + t.Fatal(err) + } + + // Update password hash. + if err := data.UpdateUser("bob", "XXX"); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(data.User("bob"), &meta.UserInfo{Name: "bob", Hash: "XXX"}) { + t.Fatalf("unexpected user: %#v", data.User("bob")) + } +} + +// Ensure that updating a non-existent user returns an error. +func TestData_UpdateUser_ErrUserNotFound(t *testing.T) { + var data meta.Data + if err := data.UpdateUser("bob", "ZZZ"); err != meta.ErrUserNotFound { + t.Fatal(err) + } +} + // Ensure the data can be deeply copied. func TestData_Clone(t *testing.T) { data := meta.Data{ @@ -21,7 +439,7 @@ func TestData_Clone(t *testing.T) { { Name: "db0", DefaultRetentionPolicy: "default", - Policies: []meta.RetentionPolicyInfo{ + RetentionPolicies: []meta.RetentionPolicyInfo{ { Name: "rp0", ReplicaN: 3, @@ -42,9 +460,11 @@ func TestData_Clone(t *testing.T) { }, }, }, - ContinuousQueries: []meta.ContinuousQueryInfo{}, }, }, + ContinuousQueries: []meta.ContinuousQueryInfo{ + {Query: "SELECT count() FROM foo"}, + }, Users: []meta.UserInfo{ { Name: "susy", @@ -70,8 +490,8 @@ func TestData_Clone(t *testing.T) { } // Ensure that changing data in the clone does not affect the original. - other.Databases[0].Policies[0].ShardGroups[0].Shards[0].OwnerIDs[1] = 9 - if v := data.Databases[0].Policies[0].ShardGroups[0].Shards[0].OwnerIDs[1]; v != 3 { + other.Databases[0].RetentionPolicies[0].ShardGroups[0].Shards[0].OwnerIDs[1] = 9 + if v := data.Databases[0].RetentionPolicies[0].ShardGroups[0].Shards[0].OwnerIDs[1]; v != 3 { t.Fatalf("editing clone changed original: %v", v) } } diff --git a/meta/errors.go b/meta/errors.go new file mode 100644 index 0000000000..917cba8b04 --- /dev/null +++ b/meta/errors.go @@ -0,0 +1,100 @@ +package meta + +import "errors" + +var ( + // ErrStoreOpen is returned when opening an already open store. + ErrStoreOpen = errors.New("store already open") + + // ErrStoreClosed is returned when closing an already closed store. + ErrStoreClosed = errors.New("raft store already closed") +) + +var ( + // ErrNodeExists is returned when creating an already existing node. + ErrNodeExists = errors.New("node already exists") + + // ErrNodeNotFound is returned when mutating a node that doesn't exist. + ErrNodeNotFound = errors.New("node not found") + + // ErrNodesRequired is returned when at least one node is required for an operation. + // This occurs when creating a shard group. + ErrNodesRequired = errors.New("at least one node required") +) + +var ( + // ErrDatabaseExists is returned when creating an already existing database. + ErrDatabaseExists = errors.New("database already exists") + + // ErrDatabaseNotFound is returned when mutating a database that doesn't exist. + ErrDatabaseNotFound = errors.New("database not found") + + // ErrDatabaseNameRequired is returned when creating a database without a name. + ErrDatabaseNameRequired = errors.New("database name required") +) + +var ( + // ErrRetentionPolicyExists is returned when creating an already existing policy. + ErrRetentionPolicyExists = errors.New("retention policy already exists") + + // ErrRetentionPolicyNotFound is returned when mutating a policy that doesn't exist. + ErrRetentionPolicyNotFound = errors.New("retention policy not found") + + // ErrRetentionPolicyNameRequired is returned when creating a policy without a name. + ErrRetentionPolicyNameRequired = errors.New("retention policy name required") + + // ErrRetentionPolicyNameExists is returned when renaming a policy to + // the same name as another existing policy. + ErrRetentionPolicyNameExists = errors.New("retention policy name already exists") +) + +var ( + // ErrShardGroupExists is returned when creating an already existing shard group. + ErrShardGroupExists = errors.New("shard group already exists") + + // ErrShardGroupNotFound is returned when mutating a shard group that doesn't exist. + ErrShardGroupNotFound = errors.New("shard group not found") +) + +var ( + // ErrContinuousQueryExists is returned when creating an already existing continuous query. + ErrContinuousQueryExists = errors.New("continuous query already exists") + + // ErrContinuousQueryNotFound is returned when removing a continuous query that doesn't exist. + ErrContinuousQueryNotFound = errors.New("continuous query not found") +) + +var ( + // ErrUserExists is returned when creating an already existing user. + ErrUserExists = errors.New("user already exists") + + // ErrUserNotFound is returned when mutating a user that doesn't exist. + ErrUserNotFound = errors.New("user not found") + + // ErrUsernameRequired is returned when creating a user without a username. + ErrUsernameRequired = errors.New("username required") +) + +var errs = [...]error{ + ErrStoreOpen, ErrStoreClosed, + ErrNodeExists, ErrNodeNotFound, + ErrDatabaseExists, ErrDatabaseNotFound, ErrDatabaseNameRequired, +} + +// errLookup stores a mapping of error strings to well defined error types. +var errLookup = make(map[string]error) + +func init() { + for _, err := range errs { + errLookup[err.Error()] = err + } +} + +// lookupError returns a known error reference, if one exists. +// Otherwise returns err. +func lookupError(err error) error { + if e, ok := errLookup[err.Error()]; ok { + return e + } + return err +} diff --git a/meta/internal/meta.pb.go b/meta/internal/meta.pb.go index 2959411a54..44bbc04028 100644 --- a/meta/internal/meta.pb.go +++ b/meta/internal/meta.pb.go @@ -19,24 +19,22 @@ It has these top-level messages: User UserPrivilege Command - CreateContinuousQueryCommand - DropContinuousQueryCommand CreateNodeCommand DeleteNodeCommand CreateDatabaseCommand DropDatabaseCommand - CreateDatabaseIfNotExistsCommand CreateRetentionPolicyCommand - CreateRetentionPolicyIfNotExistsCommand - DeleteRetentionPolicyCommand + DropRetentionPolicyCommand SetDefaultRetentionPolicyCommand UpdateRetentionPolicyCommand - CreateShardGroupIfNotExistsCommand + CreateShardGroupCommand + DeleteShardGroupCommand + CreateContinuousQueryCommand + DropContinuousQueryCommand CreateUserCommand - DeleteUserCommand + DropUserCommand UpdateUserCommand SetPrivilegeCommand - DeleteShardGroupCommand */ package internal @@ -50,65 +48,59 @@ var _ = math.Inf type Command_Type int32 const ( - Command_CreateContinuousQueryCommand Command_Type = 1 - Command_DropContinuousQueryCommand Command_Type = 2 - Command_CreateNodeCommand Command_Type = 3 - Command_DeleteNodeCommand Command_Type = 4 - Command_CreateDatabaseCommand Command_Type = 5 - Command_DropDatabaseCommand Command_Type = 6 - Command_CreateDatabaseIfNotExistsCommand Command_Type = 7 - Command_CreateRetentionPolicyCommand Command_Type = 8 - Command_CreateRetentionPolicyIfNotExistsCommand Command_Type = 9 - Command_DeleteRetentionPolicyCommand Command_Type = 10 - Command_SetDefaultRetentionPolicyCommand Command_Type = 11 - Command_UpdateRetentionPolicyCommand Command_Type = 12 - Command_CreateShardGroupIfNotExistsCommand Command_Type = 13 - Command_CreateUserCommand Command_Type = 14 - Command_DeleteUserCommand Command_Type = 15 - Command_UpdateUserCommand Command_Type = 16 - Command_SetPrivilegeCommand Command_Type = 17 - Command_DeleteShardGroupCommand Command_Type = 18 + Command_CreateNodeCommand Command_Type = 1 + Command_DeleteNodeCommand Command_Type = 2 + Command_CreateDatabaseCommand Command_Type = 3 + Command_DropDatabaseCommand Command_Type = 4 + Command_CreateRetentionPolicyCommand Command_Type = 5 + Command_DropRetentionPolicyCommand Command_Type = 6 + Command_SetDefaultRetentionPolicyCommand Command_Type = 7 + Command_UpdateRetentionPolicyCommand Command_Type = 8 + Command_CreateShardGroupCommand Command_Type = 9 + Command_DeleteShardGroupCommand Command_Type = 10 + Command_CreateContinuousQueryCommand Command_Type = 11 + Command_DropContinuousQueryCommand Command_Type = 12 + Command_CreateUserCommand Command_Type = 13 + Command_DropUserCommand Command_Type = 14 + Command_UpdateUserCommand Command_Type = 15 + Command_SetPrivilegeCommand Command_Type = 16 ) var Command_Type_name = map[int32]string{ - 1: "CreateContinuousQueryCommand", - 2: "DropContinuousQueryCommand", - 3: "CreateNodeCommand", - 4: "DeleteNodeCommand", - 5: "CreateDatabaseCommand", - 6: "DropDatabaseCommand", - 7: "CreateDatabaseIfNotExistsCommand", - 8: "CreateRetentionPolicyCommand", - 9: "CreateRetentionPolicyIfNotExistsCommand", - 10: "DeleteRetentionPolicyCommand", - 11: "SetDefaultRetentionPolicyCommand", - 12: "UpdateRetentionPolicyCommand", - 13: "CreateShardGroupIfNotExistsCommand", - 14: "CreateUserCommand", - 15: "DeleteUserCommand", - 16: "UpdateUserCommand", - 17: "SetPrivilegeCommand", - 18: "DeleteShardGroupCommand", + 1: "CreateNodeCommand", + 2: "DeleteNodeCommand", + 3: "CreateDatabaseCommand", + 4: "DropDatabaseCommand", + 5: "CreateRetentionPolicyCommand", + 6: "DropRetentionPolicyCommand", + 7: "SetDefaultRetentionPolicyCommand", + 8: "UpdateRetentionPolicyCommand", + 9: "CreateShardGroupCommand", + 10: "DeleteShardGroupCommand", + 11: "CreateContinuousQueryCommand", + 12: "DropContinuousQueryCommand", + 13: "CreateUserCommand", + 14: "DropUserCommand", + 15: "UpdateUserCommand", + 16: "SetPrivilegeCommand", } var Command_Type_value = map[string]int32{ - "CreateContinuousQueryCommand": 1, - "DropContinuousQueryCommand": 2, - "CreateNodeCommand": 3, - "DeleteNodeCommand": 4, - "CreateDatabaseCommand": 5, - "DropDatabaseCommand": 6, - "CreateDatabaseIfNotExistsCommand": 7, - "CreateRetentionPolicyCommand": 8, - "CreateRetentionPolicyIfNotExistsCommand": 9, - "DeleteRetentionPolicyCommand": 10, - "SetDefaultRetentionPolicyCommand": 11, - "UpdateRetentionPolicyCommand": 12, - "CreateShardGroupIfNotExistsCommand": 13, - "CreateUserCommand": 14, - "DeleteUserCommand": 15, - "UpdateUserCommand": 16, - "SetPrivilegeCommand": 17, - "DeleteShardGroupCommand": 18, + "CreateNodeCommand": 1, + "DeleteNodeCommand": 2, + "CreateDatabaseCommand": 3, + "DropDatabaseCommand": 4, + "CreateRetentionPolicyCommand": 5, + "DropRetentionPolicyCommand": 6, + "SetDefaultRetentionPolicyCommand": 7, + "UpdateRetentionPolicyCommand": 8, + "CreateShardGroupCommand": 9, + "DeleteShardGroupCommand": 10, + "CreateContinuousQueryCommand": 11, + "DropContinuousQueryCommand": 12, + "CreateUserCommand": 13, + "DropUserCommand": 14, + "UpdateUserCommand": 15, + "SetPrivilegeCommand": 16, } func (x Command_Type) Enum() *Command_Type { @@ -436,55 +428,7 @@ func (m *Command) GetType() Command_Type { if m != nil && m.Type != nil { return *m.Type } - return Command_CreateContinuousQueryCommand -} - -type CreateContinuousQueryCommand struct { - Query *string `protobuf:"bytes,1,req" json:"Query,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *CreateContinuousQueryCommand) Reset() { *m = CreateContinuousQueryCommand{} } -func (m *CreateContinuousQueryCommand) String() string { return proto.CompactTextString(m) } -func (*CreateContinuousQueryCommand) ProtoMessage() {} - -func (m *CreateContinuousQueryCommand) GetQuery() string { - if m != nil && m.Query != nil { - return *m.Query - } - return "" -} - -var E_CreateContinuousQueryCommand_Command = &proto.ExtensionDesc{ - ExtendedType: (*Command)(nil), - ExtensionType: (*CreateContinuousQueryCommand)(nil), - Field: 100, - Name: "internal.CreateContinuousQueryCommand.command", - Tag: "bytes,100,req,name=command", -} - -type DropContinuousQueryCommand struct { - Query *string `protobuf:"bytes,1,req" json:"Query,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *DropContinuousQueryCommand) Reset() { *m = DropContinuousQueryCommand{} } -func (m *DropContinuousQueryCommand) String() string { return proto.CompactTextString(m) } -func (*DropContinuousQueryCommand) ProtoMessage() {} - -func (m *DropContinuousQueryCommand) GetQuery() string { - if m != nil && m.Query != nil { - return *m.Query - } - return "" -} - -var E_DropContinuousQueryCommand_Command = &proto.ExtensionDesc{ - ExtendedType: (*Command)(nil), - ExtensionType: (*DropContinuousQueryCommand)(nil), - Field: 101, - Name: "internal.DropContinuousQueryCommand.command", - Tag: "bytes,101,req,name=command", + return Command_CreateNodeCommand } type CreateNodeCommand struct { @@ -506,13 +450,13 @@ func (m *CreateNodeCommand) GetHost() string { var E_CreateNodeCommand_Command = &proto.ExtensionDesc{ ExtendedType: (*Command)(nil), ExtensionType: (*CreateNodeCommand)(nil), - Field: 102, + Field: 101, Name: "internal.CreateNodeCommand.command", - Tag: "bytes,102,req,name=command", + Tag: "bytes,101,opt,name=command", } type DeleteNodeCommand struct { - ID *string `protobuf:"bytes,1,req" json:"ID,omitempty"` + ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -520,19 +464,19 @@ func (m *DeleteNodeCommand) Reset() { *m = DeleteNodeCommand{} } func (m *DeleteNodeCommand) String() string { return proto.CompactTextString(m) } func (*DeleteNodeCommand) ProtoMessage() {} -func (m *DeleteNodeCommand) GetID() string { +func (m *DeleteNodeCommand) GetID() uint64 { if m != nil && m.ID != nil { return *m.ID } - return "" + return 0 } var E_DeleteNodeCommand_Command = &proto.ExtensionDesc{ ExtendedType: (*Command)(nil), ExtensionType: (*DeleteNodeCommand)(nil), - Field: 103, + Field: 102, Name: "internal.DeleteNodeCommand.command", - Tag: "bytes,103,req,name=command", + Tag: "bytes,102,opt,name=command", } type CreateDatabaseCommand struct { @@ -554,9 +498,9 @@ func (m *CreateDatabaseCommand) GetName() string { var E_CreateDatabaseCommand_Command = &proto.ExtensionDesc{ ExtendedType: (*Command)(nil), ExtensionType: (*CreateDatabaseCommand)(nil), - Field: 104, + Field: 103, Name: "internal.CreateDatabaseCommand.command", - Tag: "bytes,104,req,name=command", + Tag: "bytes,103,opt,name=command", } type DropDatabaseCommand struct { @@ -578,33 +522,9 @@ func (m *DropDatabaseCommand) GetName() string { var E_DropDatabaseCommand_Command = &proto.ExtensionDesc{ ExtendedType: (*Command)(nil), ExtensionType: (*DropDatabaseCommand)(nil), - Field: 105, + Field: 104, Name: "internal.DropDatabaseCommand.command", - Tag: "bytes,105,req,name=command", -} - -type CreateDatabaseIfNotExistsCommand struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *CreateDatabaseIfNotExistsCommand) Reset() { *m = CreateDatabaseIfNotExistsCommand{} } -func (m *CreateDatabaseIfNotExistsCommand) String() string { return proto.CompactTextString(m) } -func (*CreateDatabaseIfNotExistsCommand) ProtoMessage() {} - -func (m *CreateDatabaseIfNotExistsCommand) GetName() string { - if m != nil && m.Name != nil { - return *m.Name - } - return "" -} - -var E_CreateDatabaseIfNotExistsCommand_Command = &proto.ExtensionDesc{ - ExtendedType: (*Command)(nil), - ExtensionType: (*CreateDatabaseIfNotExistsCommand)(nil), - Field: 106, - Name: "internal.CreateDatabaseIfNotExistsCommand.command", - Tag: "bytes,106,req,name=command", + Tag: "bytes,104,opt,name=command", } type CreateRetentionPolicyCommand struct { @@ -634,75 +554,41 @@ func (m *CreateRetentionPolicyCommand) GetRetentionPolicy() *RetentionPolicyInfo var E_CreateRetentionPolicyCommand_Command = &proto.ExtensionDesc{ ExtendedType: (*Command)(nil), ExtensionType: (*CreateRetentionPolicyCommand)(nil), - Field: 107, + Field: 105, Name: "internal.CreateRetentionPolicyCommand.command", - Tag: "bytes,107,req,name=command", + Tag: "bytes,105,opt,name=command", } -type CreateRetentionPolicyIfNotExistsCommand struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - RetentionPolicy *RetentionPolicyInfo `protobuf:"bytes,2,req" json:"RetentionPolicy,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *CreateRetentionPolicyIfNotExistsCommand) Reset() { - *m = CreateRetentionPolicyIfNotExistsCommand{} -} -func (m *CreateRetentionPolicyIfNotExistsCommand) String() string { return proto.CompactTextString(m) } -func (*CreateRetentionPolicyIfNotExistsCommand) ProtoMessage() {} - -func (m *CreateRetentionPolicyIfNotExistsCommand) GetDatabase() string { - if m != nil && m.Database != nil { - return *m.Database - } - return "" -} - -func (m *CreateRetentionPolicyIfNotExistsCommand) GetRetentionPolicy() *RetentionPolicyInfo { - if m != nil { - return m.RetentionPolicy - } - return nil -} - -var E_CreateRetentionPolicyIfNotExistsCommand_Command = &proto.ExtensionDesc{ - ExtendedType: (*Command)(nil), - ExtensionType: (*CreateRetentionPolicyIfNotExistsCommand)(nil), - Field: 108, - Name: "internal.CreateRetentionPolicyIfNotExistsCommand.command", - Tag: "bytes,108,req,name=command", -} - -type DeleteRetentionPolicyCommand struct { +type DropRetentionPolicyCommand struct { Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"` XXX_unrecognized []byte `json:"-"` } -func (m *DeleteRetentionPolicyCommand) Reset() { *m = DeleteRetentionPolicyCommand{} } -func (m *DeleteRetentionPolicyCommand) String() string { return proto.CompactTextString(m) } -func (*DeleteRetentionPolicyCommand) ProtoMessage() {} +func (m *DropRetentionPolicyCommand) Reset() { *m = DropRetentionPolicyCommand{} } +func (m *DropRetentionPolicyCommand) String() string { return proto.CompactTextString(m) } +func (*DropRetentionPolicyCommand) ProtoMessage() {} -func (m *DeleteRetentionPolicyCommand) GetDatabase() string { +func (m *DropRetentionPolicyCommand) GetDatabase() string { if m != nil && m.Database != nil { return *m.Database } return "" } -func (m *DeleteRetentionPolicyCommand) GetName() string { +func (m *DropRetentionPolicyCommand) GetName() string { if m != nil && m.Name != nil { return *m.Name } return "" } -var E_DeleteRetentionPolicyCommand_Command = &proto.ExtensionDesc{ +var E_DropRetentionPolicyCommand_Command = &proto.ExtensionDesc{ ExtendedType: (*Command)(nil), - ExtensionType: (*DeleteRetentionPolicyCommand)(nil), - Field: 109, - Name: "internal.DeleteRetentionPolicyCommand.command", - Tag: "bytes,109,req,name=command", + ExtensionType: (*DropRetentionPolicyCommand)(nil), + Field: 106, + Name: "internal.DropRetentionPolicyCommand.command", + Tag: "bytes,106,opt,name=command", } type SetDefaultRetentionPolicyCommand struct { @@ -732,9 +618,9 @@ func (m *SetDefaultRetentionPolicyCommand) GetName() string { var E_SetDefaultRetentionPolicyCommand_Command = &proto.ExtensionDesc{ ExtendedType: (*Command)(nil), ExtensionType: (*SetDefaultRetentionPolicyCommand)(nil), - Field: 110, + Field: 107, Name: "internal.SetDefaultRetentionPolicyCommand.command", - Tag: "bytes,110,req,name=command", + Tag: "bytes,107,opt,name=command", } type UpdateRetentionPolicyCommand struct { @@ -788,54 +674,142 @@ func (m *UpdateRetentionPolicyCommand) GetReplicaN() uint32 { var E_UpdateRetentionPolicyCommand_Command = &proto.ExtensionDesc{ ExtendedType: (*Command)(nil), ExtensionType: (*UpdateRetentionPolicyCommand)(nil), - Field: 111, + Field: 108, Name: "internal.UpdateRetentionPolicyCommand.command", - Tag: "bytes,111,req,name=command", + Tag: "bytes,108,opt,name=command", } -type CreateShardGroupIfNotExistsCommand struct { +type CreateShardGroupCommand struct { Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` Policy *string `protobuf:"bytes,2,req" json:"Policy,omitempty"` Timestamp *int64 `protobuf:"varint,3,req" json:"Timestamp,omitempty"` XXX_unrecognized []byte `json:"-"` } -func (m *CreateShardGroupIfNotExistsCommand) Reset() { *m = CreateShardGroupIfNotExistsCommand{} } -func (m *CreateShardGroupIfNotExistsCommand) String() string { return proto.CompactTextString(m) } -func (*CreateShardGroupIfNotExistsCommand) ProtoMessage() {} +func (m *CreateShardGroupCommand) Reset() { *m = CreateShardGroupCommand{} } +func (m *CreateShardGroupCommand) String() string { return proto.CompactTextString(m) } +func (*CreateShardGroupCommand) ProtoMessage() {} -func (m *CreateShardGroupIfNotExistsCommand) GetDatabase() string { +func (m *CreateShardGroupCommand) GetDatabase() string { if m != nil && m.Database != nil { return *m.Database } return "" } -func (m *CreateShardGroupIfNotExistsCommand) GetPolicy() string { +func (m *CreateShardGroupCommand) GetPolicy() string { if m != nil && m.Policy != nil { return *m.Policy } return "" } -func (m *CreateShardGroupIfNotExistsCommand) GetTimestamp() int64 { +func (m *CreateShardGroupCommand) GetTimestamp() int64 { if m != nil && m.Timestamp != nil { return *m.Timestamp } return 0 } -var E_CreateShardGroupIfNotExistsCommand_Command = &proto.ExtensionDesc{ +var E_CreateShardGroupCommand_Command = &proto.ExtensionDesc{ ExtendedType: (*Command)(nil), - ExtensionType: (*CreateShardGroupIfNotExistsCommand)(nil), + ExtensionType: (*CreateShardGroupCommand)(nil), + Field: 109, + Name: "internal.CreateShardGroupCommand.command", + Tag: "bytes,109,opt,name=command", +} + +type DeleteShardGroupCommand struct { + Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` + Policy *string `protobuf:"bytes,2,req" json:"Policy,omitempty"` + ShardGroupID *uint64 `protobuf:"varint,3,req" json:"ShardGroupID,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *DeleteShardGroupCommand) Reset() { *m = DeleteShardGroupCommand{} } +func (m *DeleteShardGroupCommand) String() string { return proto.CompactTextString(m) } +func (*DeleteShardGroupCommand) ProtoMessage() {} + +func (m *DeleteShardGroupCommand) GetDatabase() string { + if m != nil && m.Database != nil { + return *m.Database + } + return "" +} + +func (m *DeleteShardGroupCommand) GetPolicy() string { + if m != nil && m.Policy != nil { + return *m.Policy + } + return "" +} + +func (m *DeleteShardGroupCommand) GetShardGroupID() uint64 { + if m != nil && m.ShardGroupID != nil { + return *m.ShardGroupID + } + return 0 +} + +var E_DeleteShardGroupCommand_Command = &proto.ExtensionDesc{ + ExtendedType: (*Command)(nil), + ExtensionType: (*DeleteShardGroupCommand)(nil), + Field: 110, + Name: "internal.DeleteShardGroupCommand.command", + Tag: "bytes,110,opt,name=command", +} + +type CreateContinuousQueryCommand struct { + Query *string `protobuf:"bytes,1,req" json:"Query,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *CreateContinuousQueryCommand) Reset() { *m = CreateContinuousQueryCommand{} } +func (m *CreateContinuousQueryCommand) String() string { return proto.CompactTextString(m) } +func (*CreateContinuousQueryCommand) ProtoMessage() {} + +func (m *CreateContinuousQueryCommand) GetQuery() string { + if m != nil && m.Query != nil { + return *m.Query + } + return "" +} + +var E_CreateContinuousQueryCommand_Command = &proto.ExtensionDesc{ + ExtendedType: (*Command)(nil), + ExtensionType: (*CreateContinuousQueryCommand)(nil), + Field: 111, + Name: "internal.CreateContinuousQueryCommand.command", + Tag: "bytes,111,opt,name=command", +} + +type DropContinuousQueryCommand struct { + Query *string `protobuf:"bytes,1,req" json:"Query,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *DropContinuousQueryCommand) Reset() { *m = DropContinuousQueryCommand{} } +func (m *DropContinuousQueryCommand) String() string { return proto.CompactTextString(m) } +func (*DropContinuousQueryCommand) ProtoMessage() {} + +func (m *DropContinuousQueryCommand) GetQuery() string { + if m != nil && m.Query != nil { + return *m.Query + } + return "" +} + +var E_DropContinuousQueryCommand_Command = &proto.ExtensionDesc{ + ExtendedType: (*Command)(nil), + ExtensionType: (*DropContinuousQueryCommand)(nil), Field: 112, - Name: "internal.CreateShardGroupIfNotExistsCommand.command", - Tag: "bytes,112,req,name=command", + Name: "internal.DropContinuousQueryCommand.command", + Tag: "bytes,112,opt,name=command", } type CreateUserCommand struct { - Username *string `protobuf:"bytes,1,req" json:"Username,omitempty"` - Password *string `protobuf:"bytes,2,req" json:"Password,omitempty"` + Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` + Hash *string `protobuf:"bytes,2,req" json:"Hash,omitempty"` Admin *bool `protobuf:"varint,3,req" json:"Admin,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -844,16 +818,16 @@ func (m *CreateUserCommand) Reset() { *m = CreateUserCommand{} } func (m *CreateUserCommand) String() string { return proto.CompactTextString(m) } func (*CreateUserCommand) ProtoMessage() {} -func (m *CreateUserCommand) GetUsername() string { - if m != nil && m.Username != nil { - return *m.Username +func (m *CreateUserCommand) GetName() string { + if m != nil && m.Name != nil { + return *m.Name } return "" } -func (m *CreateUserCommand) GetPassword() string { - if m != nil && m.Password != nil { - return *m.Password +func (m *CreateUserCommand) GetHash() string { + if m != nil && m.Hash != nil { + return *m.Hash } return "" } @@ -870,36 +844,36 @@ var E_CreateUserCommand_Command = &proto.ExtensionDesc{ ExtensionType: (*CreateUserCommand)(nil), Field: 113, Name: "internal.CreateUserCommand.command", - Tag: "bytes,113,req,name=command", + Tag: "bytes,113,opt,name=command", } -type DeleteUserCommand struct { - Username *string `protobuf:"bytes,1,req" json:"Username,omitempty"` +type DropUserCommand struct { + Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` XXX_unrecognized []byte `json:"-"` } -func (m *DeleteUserCommand) Reset() { *m = DeleteUserCommand{} } -func (m *DeleteUserCommand) String() string { return proto.CompactTextString(m) } -func (*DeleteUserCommand) ProtoMessage() {} +func (m *DropUserCommand) Reset() { *m = DropUserCommand{} } +func (m *DropUserCommand) String() string { return proto.CompactTextString(m) } +func (*DropUserCommand) ProtoMessage() {} -func (m *DeleteUserCommand) GetUsername() string { - if m != nil && m.Username != nil { - return *m.Username +func (m *DropUserCommand) GetName() string { + if m != nil && m.Name != nil { + return *m.Name } return "" } -var E_DeleteUserCommand_Command = &proto.ExtensionDesc{ +var E_DropUserCommand_Command = &proto.ExtensionDesc{ ExtendedType: (*Command)(nil), - ExtensionType: (*DeleteUserCommand)(nil), + ExtensionType: (*DropUserCommand)(nil), Field: 114, - Name: "internal.DeleteUserCommand.command", - Tag: "bytes,114,req,name=command", + Name: "internal.DropUserCommand.command", + Tag: "bytes,114,opt,name=command", } type UpdateUserCommand struct { - Username *string `protobuf:"bytes,1,req" json:"Username,omitempty"` - Password *string `protobuf:"bytes,2,req" json:"Password,omitempty"` + Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` + Hash *string `protobuf:"bytes,2,req" json:"Hash,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -907,16 +881,16 @@ func (m *UpdateUserCommand) Reset() { *m = UpdateUserCommand{} } func (m *UpdateUserCommand) String() string { return proto.CompactTextString(m) } func (*UpdateUserCommand) ProtoMessage() {} -func (m *UpdateUserCommand) GetUsername() string { - if m != nil && m.Username != nil { - return *m.Username +func (m *UpdateUserCommand) GetName() string { + if m != nil && m.Name != nil { + return *m.Name } return "" } -func (m *UpdateUserCommand) GetPassword() string { - if m != nil && m.Password != nil { - return *m.Password +func (m *UpdateUserCommand) GetHash() string { + if m != nil && m.Hash != nil { + return *m.Hash } return "" } @@ -926,7 +900,7 @@ var E_UpdateUserCommand_Command = &proto.ExtensionDesc{ ExtensionType: (*UpdateUserCommand)(nil), Field: 115, Name: "internal.UpdateUserCommand.command", - Tag: "bytes,115,req,name=command", + Tag: "bytes,115,opt,name=command", } type SetPrivilegeCommand struct { @@ -958,67 +932,25 @@ var E_SetPrivilegeCommand_Command = &proto.ExtensionDesc{ ExtensionType: (*SetPrivilegeCommand)(nil), Field: 116, Name: "internal.SetPrivilegeCommand.command", - Tag: "bytes,116,req,name=command", -} - -type DeleteShardGroupCommand struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - Policy *string `protobuf:"bytes,2,req" json:"Policy,omitempty"` - ShardID *uint64 `protobuf:"varint,3,req" json:"ShardID,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *DeleteShardGroupCommand) Reset() { *m = DeleteShardGroupCommand{} } -func (m *DeleteShardGroupCommand) String() string { return proto.CompactTextString(m) } -func (*DeleteShardGroupCommand) ProtoMessage() {} - -func (m *DeleteShardGroupCommand) GetDatabase() string { - if m != nil && m.Database != nil { - return *m.Database - } - return "" -} - -func (m *DeleteShardGroupCommand) GetPolicy() string { - if m != nil && m.Policy != nil { - return *m.Policy - } - return "" -} - -func (m *DeleteShardGroupCommand) GetShardID() uint64 { - if m != nil && m.ShardID != nil { - return *m.ShardID - } - return 0 -} - -var E_DeleteShardGroupCommand_Command = &proto.ExtensionDesc{ - ExtendedType: (*Command)(nil), - ExtensionType: (*DeleteShardGroupCommand)(nil), - Field: 117, - Name: "internal.DeleteShardGroupCommand.command", - Tag: "bytes,117,req,name=command", + Tag: "bytes,116,opt,name=command", } func init() { proto.RegisterEnum("internal.Command_Type", Command_Type_name, Command_Type_value) - proto.RegisterExtension(E_CreateContinuousQueryCommand_Command) - proto.RegisterExtension(E_DropContinuousQueryCommand_Command) proto.RegisterExtension(E_CreateNodeCommand_Command) proto.RegisterExtension(E_DeleteNodeCommand_Command) proto.RegisterExtension(E_CreateDatabaseCommand_Command) proto.RegisterExtension(E_DropDatabaseCommand_Command) - proto.RegisterExtension(E_CreateDatabaseIfNotExistsCommand_Command) proto.RegisterExtension(E_CreateRetentionPolicyCommand_Command) - proto.RegisterExtension(E_CreateRetentionPolicyIfNotExistsCommand_Command) - proto.RegisterExtension(E_DeleteRetentionPolicyCommand_Command) + proto.RegisterExtension(E_DropRetentionPolicyCommand_Command) proto.RegisterExtension(E_SetDefaultRetentionPolicyCommand_Command) proto.RegisterExtension(E_UpdateRetentionPolicyCommand_Command) - proto.RegisterExtension(E_CreateShardGroupIfNotExistsCommand_Command) + proto.RegisterExtension(E_CreateShardGroupCommand_Command) + proto.RegisterExtension(E_DeleteShardGroupCommand_Command) + proto.RegisterExtension(E_CreateContinuousQueryCommand_Command) + proto.RegisterExtension(E_DropContinuousQueryCommand_Command) proto.RegisterExtension(E_CreateUserCommand_Command) - proto.RegisterExtension(E_DeleteUserCommand_Command) + proto.RegisterExtension(E_DropUserCommand_Command) proto.RegisterExtension(E_UpdateUserCommand_Command) proto.RegisterExtension(E_SetPrivilegeCommand_Command) - proto.RegisterExtension(E_DeleteShardGroupCommand_Command) } diff --git a/meta/internal/meta.proto b/meta/internal/meta.proto index 2fa15520aa..db8d8d2d3f 100644 --- a/meta/internal/meta.proto +++ b/meta/internal/meta.proto @@ -70,97 +70,66 @@ message Command { extensions 100 to max; enum Type { - CreateContinuousQueryCommand = 1; - DropContinuousQueryCommand = 2; - CreateNodeCommand = 3; - DeleteNodeCommand = 4; - CreateDatabaseCommand = 5; - DropDatabaseCommand = 6; - CreateDatabaseIfNotExistsCommand = 7; - CreateRetentionPolicyCommand = 8; - CreateRetentionPolicyIfNotExistsCommand = 9; - DeleteRetentionPolicyCommand = 10; - SetDefaultRetentionPolicyCommand = 11; - UpdateRetentionPolicyCommand = 12; - CreateShardGroupIfNotExistsCommand = 13; - CreateUserCommand = 14; - DeleteUserCommand = 15; - UpdateUserCommand = 16; - SetPrivilegeCommand = 17; - DeleteShardGroupCommand = 18; + CreateNodeCommand = 1; + DeleteNodeCommand = 2; + CreateDatabaseCommand = 3; + DropDatabaseCommand = 4; + CreateRetentionPolicyCommand = 5; + DropRetentionPolicyCommand = 6; + SetDefaultRetentionPolicyCommand = 7; + UpdateRetentionPolicyCommand = 8; + CreateShardGroupCommand = 9; + DeleteShardGroupCommand = 10; + CreateContinuousQueryCommand = 11; + DropContinuousQueryCommand = 12; + CreateUserCommand = 13; + DropUserCommand = 14; + UpdateUserCommand = 15; + SetPrivilegeCommand = 16; } required Type type = 1; } -message CreateContinuousQueryCommand { - extend Command { - required CreateContinuousQueryCommand command = 100; - } - required string Query = 1; -} - -message DropContinuousQueryCommand { - extend Command { - required DropContinuousQueryCommand command = 101; - } - required string Query = 1; -} - message CreateNodeCommand { extend Command { - required CreateNodeCommand command = 102; + optional CreateNodeCommand command = 101; } required string Host = 1; } message DeleteNodeCommand { extend Command { - required DeleteNodeCommand command = 103; + optional DeleteNodeCommand command = 102; } - required string ID = 1; + required uint64 ID = 1; } message CreateDatabaseCommand { extend Command { - required CreateDatabaseCommand command = 104; + optional CreateDatabaseCommand command = 103; } required string Name = 1; } message DropDatabaseCommand { extend Command { - required DropDatabaseCommand command = 105; - } - required string Name = 1; -} - -message CreateDatabaseIfNotExistsCommand { - extend Command { - required CreateDatabaseIfNotExistsCommand command = 106; + optional DropDatabaseCommand command = 104; } required string Name = 1; } message CreateRetentionPolicyCommand { extend Command { - required CreateRetentionPolicyCommand command = 107; + optional CreateRetentionPolicyCommand command = 105; } required string Database = 1; required RetentionPolicyInfo RetentionPolicy = 2; } -message CreateRetentionPolicyIfNotExistsCommand { +message DropRetentionPolicyCommand { extend Command { - required CreateRetentionPolicyIfNotExistsCommand command = 108; - } - required string Database = 1; - required RetentionPolicyInfo RetentionPolicy = 2; -} - -message DeleteRetentionPolicyCommand { - extend Command { - required DeleteRetentionPolicyCommand command = 109; + optional DropRetentionPolicyCommand command = 106; } required string Database = 1; required string Name = 2; @@ -168,7 +137,7 @@ message DeleteRetentionPolicyCommand { message SetDefaultRetentionPolicyCommand { extend Command { - required SetDefaultRetentionPolicyCommand command = 110; + optional SetDefaultRetentionPolicyCommand command = 107; } required string Database = 1; required string Name = 2; @@ -176,7 +145,7 @@ message SetDefaultRetentionPolicyCommand { message UpdateRetentionPolicyCommand { extend Command { - required UpdateRetentionPolicyCommand command = 111; + optional UpdateRetentionPolicyCommand command = 108; } required string Database = 1; required string Name = 2; @@ -185,53 +154,67 @@ message UpdateRetentionPolicyCommand { optional uint32 ReplicaN = 5; } -message CreateShardGroupIfNotExistsCommand { +message CreateShardGroupCommand { extend Command { - required CreateShardGroupIfNotExistsCommand command = 112; + optional CreateShardGroupCommand command = 109; } required string Database = 1; required string Policy = 2; required int64 Timestamp = 3; } +message DeleteShardGroupCommand { + extend Command { + optional DeleteShardGroupCommand command = 110; + } + required string Database = 1; + required string Policy = 2; + required uint64 ShardGroupID = 3; +} + +message CreateContinuousQueryCommand { + extend Command { + optional CreateContinuousQueryCommand command = 111; + } + required string Query = 1; +} + +message DropContinuousQueryCommand { + extend Command { + optional DropContinuousQueryCommand command = 112; + } + required string Query = 1; +} + message CreateUserCommand { extend Command { - required CreateUserCommand command = 113; + optional CreateUserCommand command = 113; } - required string Username = 1; - required string Password = 2; + required string Name = 1; + required string Hash = 2; required bool Admin = 3; } -message DeleteUserCommand { +message DropUserCommand { extend Command { - required DeleteUserCommand command = 114; + optional DropUserCommand command = 114; } - required string Username = 1; + required string Name = 1; } message UpdateUserCommand { extend Command { - required UpdateUserCommand command = 115; + optional UpdateUserCommand command = 115; } - required string Username = 1; - required string Password = 2; + required string Name = 1; + required string Hash = 2; } message SetPrivilegeCommand { extend Command { - required SetPrivilegeCommand command = 116; + optional SetPrivilegeCommand command = 116; } required string Username = 1; required UserPrivilege Privilege = 2; } -message DeleteShardGroupCommand { - extend Command { - required DeleteShardGroupCommand command = 117; - } - required string Database = 1; - required string Policy = 2; - required uint64 ShardID = 3; -} - diff --git a/meta/store.go b/meta/store.go index 4841e7fa14..963020d079 100644 --- a/meta/store.go +++ b/meta/store.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/raft" "github.com/hashicorp/raft-boltdb" "github.com/influxdb/influxdb/meta/internal" + "golang.org/x/crypto/bcrypt" ) const ( @@ -95,7 +96,7 @@ func (s *Store) Open(path string) error { // Check if store has already been opened. if s.opened() { - return errors.New("raft store already open") + return ErrStoreOpen } s.path = path @@ -143,7 +144,7 @@ func (s *Store) Open(path string) error { } // Create raft log. - r, err := raft.NewRaft(config, (*StoreFSM)(s), store, store, snapshots, s.peers, s.transport) + r, err := raft.NewRaft(config, (*storeFSM)(s), store, store, snapshots, s.peers, s.transport) if err != nil { return fmt.Errorf("new raft: %s", err) } @@ -168,7 +169,7 @@ func (s *Store) Close() error { func (s *Store) close() error { // Check if store has already been closed. if !s.opened() { - return errors.New("raft store already closed") + return ErrStoreClosed } s.path = "" @@ -188,31 +189,42 @@ func (s *Store) close() error { return nil } -// SetData sets the root meta data. -func (s *Store) SetData(data *Data) { - s.mu.Lock() - defer s.mu.Unlock() - s.data = data -} - // LeaderCh returns a channel that notifies on leadership change. // Panics when the store has not been opened yet. func (s *Store) LeaderCh() <-chan bool { s.mu.RLock() defer s.mu.RUnlock() - - if s.raft == nil { - panic("cannot retrieve leadership channel when closed") - } - + assert(s.raft != nil, "cannot retrieve leadership channel when closed") return s.raft.LeaderCh() } +// Node returns a node by id. +func (s *Store) Node(id uint64) (ni *NodeInfo, err error) { + err = s.read(func(data *Data) error { + ni = data.Node(id) + if ni == nil { + return errInvalidate + } + return nil + }) + return +} + +// NodeByHost returns a node by hostname. +func (s *Store) NodeByHost(host string) (ni *NodeInfo, err error) { + err = s.read(func(data *Data) error { + ni = data.NodeByHost(host) + if ni == nil { + return errInvalidate + } + return nil + }) + return +} + // CreateNode creates a new node in the store. func (s *Store) CreateNode(host string) (*NodeInfo, error) { - if err := s.exec( - internal.Command_CreateNodeCommand, - internal.E_CreateNodeCommand_Command, + if err := s.exec(internal.Command_CreateNodeCommand, internal.E_CreateNodeCommand_Command, &internal.CreateNodeCommand{ Host: proto.String(host), }, @@ -222,57 +234,318 @@ func (s *Store) CreateNode(host string) (*NodeInfo, error) { return s.NodeByHost(host) } -// NodeByHost returns a node by hostname. -func (s *Store) NodeByHost(host string) (*NodeInfo, error) { - n := s.data.NodeByHost(host) - - // FIX(benbjohnson): Invalidate cache if not found and check again. - - return n, nil +// DeleteNode removes a node from the metastore by id. +func (s *Store) DeleteNode(id uint64) error { + return s.exec(internal.Command_DeleteNodeCommand, internal.E_DeleteNodeCommand_Command, + &internal.DeleteNodeCommand{ + ID: proto.Uint64(id), + }, + ) } -// CreateContinuousQuery(query string) (*ContinuousQueryInfo, error) -// DropContinuousQuery(query string) error +// Database returns a database by name. +func (s *Store) Database(name string) (di *DatabaseInfo, err error) { + err = s.read(func(data *Data) error { + di = data.Database(name) + if di == nil { + return errInvalidate + } + return nil + }) + return +} -// Node(id uint64) (*NodeInfo, error) -// NodeByHost(host string) (*NodeInfo, error) -// CreateNode(host string) (*NodeInfo, error) -// DeleteNode(id uint64) error +// CreateDatabase creates a new database in the store. +func (s *Store) CreateDatabase(name string) (*DatabaseInfo, error) { + if err := s.exec(internal.Command_CreateDatabaseCommand, internal.E_CreateDatabaseCommand_Command, + &internal.CreateDatabaseCommand{ + Name: proto.String(name), + }, + ); err != nil { + return nil, err + } + return s.Database(name) +} -// Database(name string) (*DatabaseInfo, error) -// CreateDatabase(name string) (*DatabaseInfo, error) -// CreateDatabaseIfNotExists(name string) (*DatabaseInfo, error) -// DropDatabase(name string) error +// FIX: CreateDatabaseIfNotExists(name string) (*DatabaseInfo, error) -// RetentionPolicy(database, name string) (*RetentionPolicyInfo, error) -// CreateRetentionPolicy(database string, rp *RetentionPolicyInfo) (*RetentionPolicyInfo, error) -// CreateRetentionPolicyIfNotExists(database string, rp *RetentionPolicyInfo) (*RetentionPolicyInfo, error) -// SetDefaultRetentionPolicy(database, name string) error -// UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) (*RetentionPolicyInfo, error) -// DeleteRetentionPolicy(database, name string) error +// DropDatabase removes a database from the metastore by name. +func (s *Store) DropDatabase(name string) error { + return s.exec(internal.Command_DropDatabaseCommand, internal.E_DropDatabaseCommand_Command, + &internal.DropDatabaseCommand{ + Name: proto.String(name), + }, + ) +} -// ShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) -// CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) -// DeleteShardGroup(database, policy string, shardID uint64) error +// RetentionPolicy returns a retention policy for a database by name. +func (s *Store) RetentionPolicy(database, name string) (rpi *RetentionPolicyInfo, err error) { + err = s.read(func(data *Data) error { + rpi, err = data.RetentionPolicy(database, name) + if err != nil { + return err + } else if rpi == nil { + return errInvalidate + } + return nil + }) + return +} -// User(username string) (*UserInfo, error) -// CreateUser(username, password string, admin bool) (*UserInfo, error) -// UpdateUser(username, password string) (*UserInfo, error) -// DeleteUser(username string) error -// SetPrivilege(p influxql.Privilege, username string, dbname string) error +// CreateRetentionPolicy creates a new retention policy for a database. +func (s *Store) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo) (*RetentionPolicyInfo, error) { + if err := s.exec(internal.Command_CreateRetentionPolicyCommand, internal.E_CreateRetentionPolicyCommand_Command, + &internal.CreateRetentionPolicyCommand{ + Database: proto.String(database), + RetentionPolicy: rpi.protobuf(), + }, + ); err != nil { + return nil, err + } + + return s.RetentionPolicy(database, rpi.Name) +} + +// SetDefaultRetentionPolicy sets the default retention policy for a database. +func (s *Store) SetDefaultRetentionPolicy(database, name string) error { + return s.exec(internal.Command_SetDefaultRetentionPolicyCommand, internal.E_SetDefaultRetentionPolicyCommand_Command, + &internal.SetDefaultRetentionPolicyCommand{ + Database: proto.String(database), + Name: proto.String(name), + }, + ) +} + +// UpdateRetentionPolicy updates an existing retention policy. +func (s *Store) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error { + var newName *string + if rpu.Name != nil { + newName = rpu.Name + } + + var duration *int64 + if rpu.Duration != nil { + value := int64(*rpu.Duration) + duration = &value + } + + var replicaN *uint32 + if rpu.Duration != nil { + value := uint32(*rpu.ReplicaN) + replicaN = &value + } + + return s.exec(internal.Command_UpdateRetentionPolicyCommand, internal.E_UpdateRetentionPolicyCommand_Command, + &internal.UpdateRetentionPolicyCommand{ + Database: proto.String(database), + Name: proto.String(name), + NewName: newName, + Duration: duration, + ReplicaN: replicaN, + }, + ) +} + +// DropRetentionPolicy removes a policy from a database by name. +func (s *Store) DropRetentionPolicy(database, name string) error { + return s.exec(internal.Command_DropRetentionPolicyCommand, internal.E_DropRetentionPolicyCommand_Command, + &internal.DropRetentionPolicyCommand{ + Database: proto.String(database), + Name: proto.String(name), + }, + ) +} + +// FIX: CreateRetentionPolicyIfNotExists(database string, rp *RetentionPolicyInfo) (*RetentionPolicyInfo, error) + +// CreateShardGroup creates a new shard group in a retention policy for a given time. +func (s *Store) CreateShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) { + if err := s.exec(internal.Command_CreateShardGroupCommand, internal.E_CreateShardGroupCommand_Command, + &internal.CreateShardGroupCommand{ + Database: proto.String(database), + Policy: proto.String(policy), + Timestamp: proto.Int64(timestamp.UnixNano()), + }, + ); err != nil { + return nil, err + } + + return s.ShardGroupByTimestamp(database, policy, timestamp) +} + +// DeleteShardGroup removes an existing shard group from a policy by ID. +func (s *Store) DeleteShardGroup(database, policy string, id uint64) error { + return s.exec(internal.Command_DeleteShardGroupCommand, internal.E_DeleteShardGroupCommand_Command, + &internal.DeleteShardGroupCommand{ + Database: proto.String(database), + Policy: proto.String(policy), + ShardGroupID: proto.Uint64(id), + }, + ) +} + +// ShardGroupByTimestamp returns a shard group for a policy by timestamp. +func (s *Store) ShardGroupByTimestamp(database, policy string, timestamp time.Time) (sgi *ShardGroupInfo, err error) { + err = s.read(func(data *Data) error { + sgi, err = data.ShardGroupByTimestamp(database, policy, timestamp) + if err != nil { + return err + } else if sgi == nil { + return errInvalidate + } + return nil + }) + return +} + +// CreateContinuousQuery creates a new continuous query on the store. +func (s *Store) CreateContinuousQuery(query string) error { + return s.exec(internal.Command_CreateContinuousQueryCommand, internal.E_CreateContinuousQueryCommand_Command, + &internal.CreateContinuousQueryCommand{ + Query: proto.String(query), + }, + ) +} + +// DropContinuousQuery removes a continuous query from the store. +func (s *Store) DropContinuousQuery(query string) error { + return s.exec(internal.Command_DropContinuousQueryCommand, internal.E_DropContinuousQueryCommand_Command, + &internal.DropContinuousQueryCommand{ + Query: proto.String(query), + }, + ) +} + +// ContinuousQueries returns all continuous queries. +func (s *Store) ContinuousQueries() (a []ContinuousQueryInfo, err error) { + err = s.read(func(data *Data) error { + a = data.ContinuousQueries + return nil + }) + return +} + +// User returns a user by name. +func (s *Store) User(name string) (ui *UserInfo, err error) { + err = s.read(func(data *Data) error { + ui = data.User(name) + if ui == nil { + return errInvalidate + } + return nil + }) + return +} + +// Users returns a list of all users. +func (s *Store) Users() (a []UserInfo, err error) { + err = s.read(func(data *Data) error { + a = data.Users + return nil + }) + return +} + +// CreateUser creates a new user in the store. +func (s *Store) CreateUser(name, password string, admin bool) (*UserInfo, error) { + // Hash the password before serializing it. + hash, err := HashPassword(password) + if err != nil { + return nil, err + } + + // Serialize command and send it to the leader. + if err := s.exec(internal.Command_CreateUserCommand, internal.E_CreateUserCommand_Command, + &internal.CreateUserCommand{ + Name: proto.String(name), + Hash: proto.String(string(hash)), + Admin: proto.Bool(admin), + }, + ); err != nil { + return nil, err + } + return s.User(name) +} + +// DropUser removes a user from the metastore by name. +func (s *Store) DropUser(name string) error { + return s.exec(internal.Command_DropUserCommand, internal.E_DropUserCommand_Command, + &internal.DropUserCommand{ + Name: proto.String(name), + }, + ) +} + +// UpdateUser updates an existing user in the store. +func (s *Store) UpdateUser(name, password string) error { + // Hash the password before serializing it. + hash, err := HashPassword(password) + if err != nil { + return err + } + + // Serialize command and send it to the leader. + return s.exec(internal.Command_UpdateUserCommand, internal.E_UpdateUserCommand_Command, + &internal.UpdateUserCommand{ + Name: proto.String(name), + Hash: proto.String(string(hash)), + }, + ) +} + +// read executes a function with the current metadata. +// If an error is returned then the cache is invalidated and retried. +// +// The error returned by the retry is passed through to the original caller +// unless the error is errInvalidate. A nil error is passed through when +// errInvalidate is returned. +func (s *Store) read(fn func(*Data) error) error { + // First use the cached metadata. + s.mu.RLock() + data := s.data + s.mu.RUnlock() + + // Execute fn against cached data. + // Return immediately if there was no error. + if err := fn(data); err == nil { + return nil + } + + // If an error occurred then invalidate cache and retry. + if err := s.invalidate(); err != nil { + return err + } + + // Re-read the metadata. + s.mu.RLock() + data = s.data + s.mu.RUnlock() + + // Passthrough error unless it is a cache invalidation. + if err := fn(data); err != nil && err != errInvalidate { + return err + } + + return nil +} + +// errInvalidate is returned to read() when the cache should be invalidated +// but an error should not be passed through to the caller. +var errInvalidate = errors.New("invalidate cache") + +func (s *Store) invalidate() error { + return nil // FIX(benbjohnson): Reload cache from the leader. +} func (s *Store) exec(typ internal.Command_Type, desc *proto.ExtensionDesc, value interface{}) error { // Create command. cmd := &internal.Command{Type: &typ} - if err := proto.SetExtension(cmd, desc, value); err != nil { - return fmt.Errorf("set extension: %s", err) - } + err := proto.SetExtension(cmd, desc, value) + assert(err == nil, "proto.SetExtension: %s", err) // Marshal to a byte slice. b, err := proto.Marshal(cmd) - if err != nil { - return fmt.Errorf("marshal proto: %s", err) - } + assert(err == nil, "proto.Marshal: %s", err) // Apply to raft log. f := s.raft.Apply(b, 0) @@ -280,27 +553,32 @@ func (s *Store) exec(typ internal.Command_Type, desc *proto.ExtensionDesc, value return err } - if resp := f.Response(); resp != nil { - panic(fmt.Sprintf("unexpected response: %#v", resp)) + // Return response if it's an error. + // No other non-nil objects should be returned. + resp := f.Response() + if err, ok := resp.(error); ok { + return lookupError(err) } + assert(resp == nil, "unexpected response: %#v", resp) return nil } -// StoreFSM represents the finite state machine used by Store to interact with Raft. -type StoreFSM Store +// storeFSM represents the finite state machine used by Store to interact with Raft. +type storeFSM Store -func (fsm *StoreFSM) Apply(l *raft.Log) interface{} { +func (fsm *storeFSM) Apply(l *raft.Log) interface{} { var cmd internal.Command if err := proto.Unmarshal(l.Data, &cmd); err != nil { panic(fmt.Errorf("cannot marshal command: %x", l.Data)) } + // Lock the store. + s := (*Store)(fsm) + s.mu.Lock() + defer s.mu.Unlock() + switch cmd.GetType() { - case internal.Command_CreateContinuousQueryCommand: - return fsm.applyCreateContinuousQueryCommand(&cmd) - case internal.Command_DropContinuousQueryCommand: - return fsm.applyDropContinuousQueryCommand(&cmd) case internal.Command_CreateNodeCommand: return fsm.applyCreateNodeCommand(&cmd) case internal.Command_DeleteNodeCommand: @@ -309,24 +587,24 @@ func (fsm *StoreFSM) Apply(l *raft.Log) interface{} { return fsm.applyCreateDatabaseCommand(&cmd) case internal.Command_DropDatabaseCommand: return fsm.applyDropDatabaseCommand(&cmd) - case internal.Command_CreateDatabaseIfNotExistsCommand: - return fsm.applyCreateDatabaseIfNotExistsCommand(&cmd) case internal.Command_CreateRetentionPolicyCommand: return fsm.applyCreateRetentionPolicyCommand(&cmd) - case internal.Command_CreateRetentionPolicyIfNotExistsCommand: - return fsm.applyCreateRetentionPolicyIfNotExistsCommand(&cmd) - case internal.Command_DeleteRetentionPolicyCommand: - return fsm.applyDeleteRetentionPolicyCommand(&cmd) + case internal.Command_DropRetentionPolicyCommand: + return fsm.applyDropRetentionPolicyCommand(&cmd) case internal.Command_SetDefaultRetentionPolicyCommand: return fsm.applySetDefaultRetentionPolicyCommand(&cmd) case internal.Command_UpdateRetentionPolicyCommand: return fsm.applyUpdateRetentionPolicyCommand(&cmd) - case internal.Command_CreateShardGroupIfNotExistsCommand: - return fsm.applyCreateShardGroupIfNotExistsCommand(&cmd) + case internal.Command_CreateShardGroupCommand: + return fsm.applyCreateShardGroupCommand(&cmd) + case internal.Command_CreateContinuousQueryCommand: + return fsm.applyCreateContinuousQueryCommand(&cmd) + case internal.Command_DropContinuousQueryCommand: + return fsm.applyDropContinuousQueryCommand(&cmd) case internal.Command_CreateUserCommand: return fsm.applyCreateUserCommand(&cmd) - case internal.Command_DeleteUserCommand: - return fsm.applyDeleteUserCommand(&cmd) + case internal.Command_DropUserCommand: + return fsm.applyDropUserCommand(&cmd) case internal.Command_UpdateUserCommand: return fsm.applyUpdateUserCommand(&cmd) case internal.Command_SetPrivilegeCommand: @@ -338,97 +616,245 @@ func (fsm *StoreFSM) Apply(l *raft.Log) interface{} { } } -func (fsm *StoreFSM) applyCreateContinuousQueryCommand(cmd *internal.Command) interface{} { - panic("not yet implemented") -} - -func (fsm *StoreFSM) applyDropContinuousQueryCommand(cmd *internal.Command) interface{} { - panic("not yet implemented") -} - -func (fsm *StoreFSM) applyCreateNodeCommand(cmd *internal.Command) interface{} { +func (fsm *storeFSM) applyCreateNodeCommand(cmd *internal.Command) interface{} { ext, _ := proto.GetExtension(cmd, internal.E_CreateNodeCommand_Command) v := ext.(*internal.CreateNodeCommand) // Copy data and update. - data, err := fsm.data.CreateNode(v.GetHost()) - if err != nil { + other := fsm.data.Clone() + if err := other.CreateNode(v.GetHost()); err != nil { return err } - (*Store)(fsm).SetData(data) + fsm.data = other return nil } -func (fsm *StoreFSM) applyDeleteNodeCommand(cmd *internal.Command) interface{} { +func (fsm *storeFSM) applyDeleteNodeCommand(cmd *internal.Command) interface{} { + ext, _ := proto.GetExtension(cmd, internal.E_DeleteNodeCommand_Command) + v := ext.(*internal.DeleteNodeCommand) + + // Copy data and update. + other := fsm.data.Clone() + if err := other.DeleteNode(v.GetID()); err != nil { + return err + } + fsm.data = other + + return nil +} + +func (fsm *storeFSM) applyCreateDatabaseCommand(cmd *internal.Command) interface{} { + ext, _ := proto.GetExtension(cmd, internal.E_CreateDatabaseCommand_Command) + v := ext.(*internal.CreateDatabaseCommand) + + // Copy data and update. + other := fsm.data.Clone() + if err := other.CreateDatabase(v.GetName()); err != nil { + return err + } + fsm.data = other + + return nil +} + +func (fsm *storeFSM) applyDropDatabaseCommand(cmd *internal.Command) interface{} { + ext, _ := proto.GetExtension(cmd, internal.E_DropDatabaseCommand_Command) + v := ext.(*internal.DropDatabaseCommand) + + // Copy data and update. + other := fsm.data.Clone() + if err := other.DropDatabase(v.GetName()); err != nil { + return err + } + fsm.data = other + + return nil +} + +func (fsm *storeFSM) applyCreateRetentionPolicyCommand(cmd *internal.Command) interface{} { + ext, _ := proto.GetExtension(cmd, internal.E_CreateRetentionPolicyCommand_Command) + v := ext.(*internal.CreateRetentionPolicyCommand) + pb := v.GetRetentionPolicy() + + // Copy data and update. + other := fsm.data.Clone() + if err := other.CreateRetentionPolicy(v.GetDatabase(), + &RetentionPolicyInfo{ + Name: pb.GetName(), + ReplicaN: int(pb.GetReplicaN()), + Duration: time.Duration(pb.GetDuration()), + ShardGroupDuration: time.Duration(pb.GetShardGroupDuration()), + }); err != nil { + return err + } + fsm.data = other + + return nil +} + +func (fsm *storeFSM) applyDropRetentionPolicyCommand(cmd *internal.Command) interface{} { + ext, _ := proto.GetExtension(cmd, internal.E_DropRetentionPolicyCommand_Command) + v := ext.(*internal.DropRetentionPolicyCommand) + + // Copy data and update. + other := fsm.data.Clone() + if err := other.DropRetentionPolicy(v.GetDatabase(), v.GetName()); err != nil { + return err + } + fsm.data = other + + return nil +} + +func (fsm *storeFSM) applySetDefaultRetentionPolicyCommand(cmd *internal.Command) interface{} { + ext, _ := proto.GetExtension(cmd, internal.E_SetDefaultRetentionPolicyCommand_Command) + v := ext.(*internal.SetDefaultRetentionPolicyCommand) + + // Copy data and update. + other := fsm.data.Clone() + if err := other.SetDefaultRetentionPolicy(v.GetDatabase(), v.GetName()); err != nil { + return err + } + fsm.data = other + + return nil +} + +func (fsm *storeFSM) applyUpdateRetentionPolicyCommand(cmd *internal.Command) interface{} { + ext, _ := proto.GetExtension(cmd, internal.E_UpdateRetentionPolicyCommand_Command) + v := ext.(*internal.UpdateRetentionPolicyCommand) + + // Create update object. + rpu := RetentionPolicyUpdate{Name: v.NewName} + if v.Duration != nil { + value := time.Duration(v.GetDuration()) + rpu.Duration = &value + } + if v.ReplicaN != nil { + value := int(v.GetReplicaN()) + rpu.ReplicaN = &value + } + + // Copy data and update. + other := fsm.data.Clone() + if err := other.UpdateRetentionPolicy(v.GetDatabase(), v.GetName(), &rpu); err != nil { + return err + } + fsm.data = other + + return nil +} + +func (fsm *storeFSM) applyCreateShardGroupCommand(cmd *internal.Command) interface{} { + ext, _ := proto.GetExtension(cmd, internal.E_CreateShardGroupCommand_Command) + v := ext.(*internal.CreateShardGroupCommand) + + // Copy data and update. + other := fsm.data.Clone() + if err := other.CreateShardGroup(v.GetDatabase(), v.GetPolicy(), time.Unix(0, v.GetTimestamp())); err != nil { + return err + } + fsm.data = other + + return nil +} + +func (fsm *storeFSM) applyDeleteShardGroupCommand(cmd *internal.Command) interface{} { + ext, _ := proto.GetExtension(cmd, internal.E_DeleteShardGroupCommand_Command) + v := ext.(*internal.DeleteShardGroupCommand) + + // Copy data and update. + other := fsm.data.Clone() + if err := other.DeleteShardGroup(v.GetDatabase(), v.GetPolicy(), v.GetShardGroupID()); err != nil { + return err + } + fsm.data = other + + return nil +} + +func (fsm *storeFSM) applyCreateContinuousQueryCommand(cmd *internal.Command) interface{} { + ext, _ := proto.GetExtension(cmd, internal.E_CreateContinuousQueryCommand_Command) + v := ext.(*internal.CreateContinuousQueryCommand) + + // Copy data and update. + other := fsm.data.Clone() + if err := other.CreateContinuousQuery(v.GetQuery()); err != nil { + return err + } + fsm.data = other + + return nil +} + +func (fsm *storeFSM) applyDropContinuousQueryCommand(cmd *internal.Command) interface{} { + ext, _ := proto.GetExtension(cmd, internal.E_DropContinuousQueryCommand_Command) + v := ext.(*internal.DropContinuousQueryCommand) + + // Copy data and update. + other := fsm.data.Clone() + if err := other.DropContinuousQuery(v.GetQuery()); err != nil { + return err + } + fsm.data = other + + return nil +} + +func (fsm *storeFSM) applyCreateUserCommand(cmd *internal.Command) interface{} { + ext, _ := proto.GetExtension(cmd, internal.E_CreateUserCommand_Command) + v := ext.(*internal.CreateUserCommand) + + // Copy data and update. + other := fsm.data.Clone() + if err := other.CreateUser(v.GetName(), v.GetHash(), v.GetAdmin()); err != nil { + return err + } + fsm.data = other + + return nil +} + +func (fsm *storeFSM) applyDropUserCommand(cmd *internal.Command) interface{} { + ext, _ := proto.GetExtension(cmd, internal.E_DropUserCommand_Command) + v := ext.(*internal.DropUserCommand) + + // Copy data and update. + other := fsm.data.Clone() + if err := other.DropUser(v.GetName()); err != nil { + return err + } + fsm.data = other + return nil +} + +func (fsm *storeFSM) applyUpdateUserCommand(cmd *internal.Command) interface{} { + ext, _ := proto.GetExtension(cmd, internal.E_UpdateUserCommand_Command) + v := ext.(*internal.UpdateUserCommand) + + // Copy data and update. + other := fsm.data.Clone() + if err := other.UpdateUser(v.GetName(), v.GetHash()); err != nil { + return err + } + fsm.data = other + return nil +} + +func (fsm *storeFSM) applySetPrivilegeCommand(cmd *internal.Command) interface{} { panic("not yet implemented") } -func (fsm *StoreFSM) applyCreateDatabaseCommand(cmd *internal.Command) interface{} { - panic("not yet implemented") -} - -func (fsm *StoreFSM) applyDropDatabaseCommand(cmd *internal.Command) interface{} { - panic("not yet implemented") -} - -func (fsm *StoreFSM) applyCreateDatabaseIfNotExistsCommand(cmd *internal.Command) interface{} { - panic("not yet implemented") -} - -func (fsm *StoreFSM) applyCreateRetentionPolicyCommand(cmd *internal.Command) interface{} { - panic("not yet implemented") -} - -func (fsm *StoreFSM) applyCreateRetentionPolicyIfNotExistsCommand(cmd *internal.Command) interface{} { - panic("not yet implemented") -} - -func (fsm *StoreFSM) applyDeleteRetentionPolicyCommand(cmd *internal.Command) interface{} { - panic("not yet implemented") -} - -func (fsm *StoreFSM) applySetDefaultRetentionPolicyCommand(cmd *internal.Command) interface{} { - panic("not yet implemented") -} - -func (fsm *StoreFSM) applyUpdateRetentionPolicyCommand(cmd *internal.Command) interface{} { - panic("not yet implemented") -} - -func (fsm *StoreFSM) applyCreateShardGroupIfNotExistsCommand(cmd *internal.Command) interface{} { - panic("not yet implemented") -} - -func (fsm *StoreFSM) applyCreateUserCommand(cmd *internal.Command) interface{} { - panic("not yet implemented") -} - -func (fsm *StoreFSM) applyDeleteUserCommand(cmd *internal.Command) interface{} { - panic("not yet implemented") -} - -func (fsm *StoreFSM) applyUpdateUserCommand(cmd *internal.Command) interface{} { - panic("not yet implemented") -} - -func (fsm *StoreFSM) applySetPrivilegeCommand(cmd *internal.Command) interface{} { - panic("not yet implemented") -} - -func (fsm *StoreFSM) applyDeleteShardGroupCommand(cmd *internal.Command) interface{} { - panic("not yet implemented") -} - -func (fsm *StoreFSM) Snapshot() (raft.FSMSnapshot, error) { +func (fsm *storeFSM) Snapshot() (raft.FSMSnapshot, error) { s := (*Store)(fsm) s.mu.Lock() defer s.mu.Unlock() - return &StoreFSMSnapshot{Data: (*Store)(fsm).data}, nil + return &storeFSMSnapshot{Data: (*Store)(fsm).data}, nil } -func (fsm *StoreFSM) Restore(r io.ReadCloser) error { +func (fsm *storeFSM) Restore(r io.ReadCloser) error { // Read all bytes. // b, err := ioutil.ReadAll(r) // if err != nil { @@ -441,11 +867,11 @@ func (fsm *StoreFSM) Restore(r io.ReadCloser) error { return nil } -type StoreFSMSnapshot struct { +type storeFSMSnapshot struct { Data *Data } -func (s *StoreFSMSnapshot) Persist(sink raft.SnapshotSink) error { +func (s *storeFSMSnapshot) Persist(sink raft.SnapshotSink) error { // TODO: Encode data. // TODO: sink.Write(p) // TODO: sink.Close() @@ -453,13 +879,36 @@ func (s *StoreFSMSnapshot) Persist(sink raft.SnapshotSink) error { } // Release is invoked when we are finished with the snapshot -func (s *StoreFSMSnapshot) Release() {} +func (s *storeFSMSnapshot) Release() {} // RetentionPolicyUpdate represents retention policy fields to be updated. type RetentionPolicyUpdate struct { Name *string Duration *time.Duration - ReplicaN *uint32 + ReplicaN *int +} + +func (rpu *RetentionPolicyUpdate) SetName(v string) { rpu.Name = &v } +func (rpu *RetentionPolicyUpdate) SetDuration(v time.Duration) { rpu.Duration = &v } +func (rpu *RetentionPolicyUpdate) SetReplicaN(v int) { rpu.ReplicaN = &v } + +// BcryptCost is the cost associated with generating password with Bcrypt. +// This setting is lowered during testing to improve test suite performance. +var BcryptCost = 10 + +// HashPassword generates a cryptographically secure hash for password. +// Returns an error if the password is invalid or a hash cannot be generated. +func HashPassword(password string) ([]byte, error) { + // The second arg is the cost of the hashing, higher is slower but makes + // it harder to brute force, since it will be really slow and impractical + return bcrypt.GenerateFromPassword([]byte(password), BcryptCost) +} + +// assert will panic with a given formatted message if the given condition is false. +func assert(condition bool, msg string, v ...interface{}) { + if !condition { + panic(fmt.Sprintf("assert failed: "+msg, v...)) + } } func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) } diff --git a/meta/store_test.go b/meta/store_test.go index 5db09a872a..be8133098d 100644 --- a/meta/store_test.go +++ b/meta/store_test.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "log" "os" + "reflect" "testing" "time" @@ -39,18 +40,499 @@ func TestStore_Open(t *testing.T) { } } +// Ensure the store returns an error +func TestStore_Open_ErrStoreOpen(t *testing.T) { + s := MustOpenStore() + defer s.Close() + + if err := s.Open(s.Path()); err != meta.ErrStoreOpen { + t.Fatalf("unexpected error: %s", err) + } +} + // Ensure the store can create a new node. func TestStore_CreateNode(t *testing.T) { s := MustOpenStore() defer s.Close() <-s.LeaderCh() - ni, err := s.CreateNode("host0") - if err != nil { + // Create node. + if ni, err := s.CreateNode("host0"); err != nil { t.Fatal(err) } else if *ni != (meta.NodeInfo{ID: 1, Host: "host0"}) { t.Fatalf("unexpected node: %#v", ni) } + + // Create another node. + if ni, err := s.CreateNode("host1"); err != nil { + t.Fatal(err) + } else if *ni != (meta.NodeInfo{ID: 2, Host: "host1"}) { + t.Fatalf("unexpected node: %#v", ni) + } +} + +// Ensure that creating an existing node returns an error. +func TestStore_CreateNode_ErrNodeExists(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + // Create node. + if _, err := s.CreateNode("host0"); err != nil { + t.Fatal(err) + } + + // Create it again. + if _, err := s.CreateNode("host0"); err != meta.ErrNodeExists { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure the store can find a node by ID. +func TestStore_Node(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + // Create nodes. + for i := 0; i < 3; i++ { + if _, err := s.CreateNode(fmt.Sprintf("host%d", i)); err != nil { + t.Fatal(err) + } + } + + // Find second node. + if ni, err := s.Node(2); err != nil { + t.Fatal(err) + } else if *ni != (meta.NodeInfo{ID: 2, Host: "host1"}) { + t.Fatalf("unexpected node: %#v", ni) + } +} + +// Ensure the store can find a node by host. +func TestStore_NodeByHost(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + // Create nodes. + for i := 0; i < 3; i++ { + if _, err := s.CreateNode(fmt.Sprintf("host%d", i)); err != nil { + t.Fatal(err) + } + } + + // Find second node. + if ni, err := s.NodeByHost("host1"); err != nil { + t.Fatal(err) + } else if *ni != (meta.NodeInfo{ID: 2, Host: "host1"}) { + t.Fatalf("unexpected node: %#v", ni) + } +} + +// Ensure the store can delete an existing node. +func TestStore_DeleteNode(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + // Create nodes. + for i := 0; i < 3; i++ { + if _, err := s.CreateNode(fmt.Sprintf("host%d", i)); err != nil { + t.Fatal(err) + } + } + + // Remove second node. + if err := s.DeleteNode(2); err != nil { + t.Fatal(err) + } + + // Ensure remaining nodes are correct. + if ni, _ := s.Node(1); *ni != (meta.NodeInfo{ID: 1, Host: "host0"}) { + t.Fatalf("unexpected node(1): %#v", ni) + } + if ni, _ := s.Node(2); ni != nil { + t.Fatalf("unexpected node(2): %#v", ni) + } + if ni, _ := s.Node(3); *ni != (meta.NodeInfo{ID: 3, Host: "host2"}) { + t.Fatalf("unexpected node(3): %#v", ni) + } +} + +// Ensure the store returns an error when deleting a node that doesn't exist. +func TestStore_DeleteNode_ErrNodeNotFound(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + if err := s.DeleteNode(2); err != meta.ErrNodeNotFound { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure the store can create a new database. +func TestStore_CreateDatabase(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + // Create database. + if di, err := s.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(di, &meta.DatabaseInfo{Name: "db0"}) { + t.Fatalf("unexpected database: %#v", di) + } + + // Create another database. + if di, err := s.CreateDatabase("db1"); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(di, &meta.DatabaseInfo{Name: "db1"}) { + t.Fatalf("unexpected database: %#v", di) + } +} + +// Ensure the store can delete an existing database. +func TestStore_DropDatabase(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + // Create databases. + for i := 0; i < 3; i++ { + if _, err := s.CreateDatabase(fmt.Sprintf("db%d", i)); err != nil { + t.Fatal(err) + } + } + + // Remove a database. + if err := s.DropDatabase("db1"); err != nil { + t.Fatal(err) + } + + // Ensure remaining nodes are correct. + if di, _ := s.Database("db0"); !reflect.DeepEqual(di, &meta.DatabaseInfo{Name: "db0"}) { + t.Fatalf("unexpected database(0): %#v", di) + } + if di, _ := s.Database("db1"); di != nil { + t.Fatalf("unexpected database(1): %#v", di) + } + if di, _ := s.Database("db2"); !reflect.DeepEqual(di, &meta.DatabaseInfo{Name: "db2"}) { + t.Fatalf("unexpected database(2): %#v", di) + } +} + +// Ensure the store returns an error when dropping a database that doesn't exist. +func TestStore_DropDatabase_ErrDatabaseNotFound(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + if err := s.DropDatabase("no_such_database"); err != meta.ErrDatabaseNotFound { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure the store can create a retention policy on a database. +func TestStore_CreateRetentionPolicy(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + // Create database. + if _, err := s.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } + + // Create policy on database. + if rpi, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{ + Name: "rp0", + ReplicaN: 2, + Duration: 48 * time.Hour, + }); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{ + Name: "rp0", + ReplicaN: 2, + Duration: 48 * time.Hour, + ShardGroupDuration: 24 * time.Hour, + }) { + t.Fatalf("unexpected policy: %#v", rpi) + } +} + +// Ensure the store can delete a retention policy. +func TestStore_DropRetentionPolicy(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + // Create database. + if _, err := s.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } + + // Create policies. + for i := 0; i < 3; i++ { + if _, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: fmt.Sprintf("rp%d", i)}); err != nil { + t.Fatal(err) + } + } + + // Remove a policy. + if err := s.DropRetentionPolicy("db0", "rp1"); err != nil { + t.Fatal(err) + } + + // Ensure remaining policies are correct. + if rpi, _ := s.RetentionPolicy("db0", "rp0"); !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{Name: "rp0", ShardGroupDuration: 7 * 24 * time.Hour}) { + t.Fatalf("unexpected policy(0): %#v", rpi) + } + if rpi, _ := s.RetentionPolicy("db0", "rp1"); rpi != nil { + t.Fatalf("unexpected policy(1): %#v", rpi) + } + if rpi, _ := s.RetentionPolicy("db0", "rp2"); !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{Name: "rp2", ShardGroupDuration: 7 * 24 * time.Hour}) { + t.Fatalf("unexpected policy(2): %#v", rpi) + } +} + +// Ensure the store can set the default retention policy on a database. +func TestStore_SetDefaultRetentionPolicy(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + // Create database. + if _, err := s.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } else if _, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil { + t.Fatal(err) + } + + // Set default policy. + if err := s.SetDefaultRetentionPolicy("db0", "rp0"); err != nil { + t.Fatal(err) + } + + // Ensure default policy is set. + if di, _ := s.Database("db0"); di.DefaultRetentionPolicy != "rp0" { + t.Fatalf("unexpected default retention policy: %s", di.DefaultRetentionPolicy) + } +} + +// Ensure the store can update a retention policy. +func TestStore_UpdateRetentionPolicy(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + // Create database. + if _, err := s.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } else if _, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil { + t.Fatal(err) + } + + // Update policy. + var rpu meta.RetentionPolicyUpdate + rpu.SetName("rp1") + rpu.SetDuration(10 * time.Hour) + rpu.SetReplicaN(3) + if err := s.UpdateRetentionPolicy("db0", "rp0", &rpu); err != nil { + t.Fatal(err) + } + + // Ensure policy is updated. + if rpi, err := s.RetentionPolicy("db0", "rp1"); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{ + Name: "rp1", + Duration: 10 * time.Hour, + ShardGroupDuration: 7 * 24 * time.Hour, + ReplicaN: 3, + }) { + t.Fatalf("unexpected policy: %#v", rpi) + } +} + +// Ensure the store can create a shard group on a retention policy. +func TestStore_CreateShardGroup(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + // Create node & database. + if _, err := s.CreateNode("host0"); err != nil { + t.Fatal(err) + } else if _, err := s.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1, Duration: 1 * time.Hour}); err != nil { + t.Fatal(err) + } + + // Create policy on database. + if sgi, err := s.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil { + t.Fatal(err) + } else if sgi.ID != 1 { + t.Fatalf("unexpected shard group: %#v", sgi) + } +} + +// Ensure the store can delete an existing shard group. +func TestStore_DeleteShardGroup(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + // Create node, database, policy, & group. + if _, err := s.CreateNode("host0"); err != nil { + t.Fatal(err) + } else if _, err := s.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1, Duration: 1 * time.Hour}); err != nil { + t.Fatal(err) + } else if _, err := s.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil { + t.Fatal(err) + } + + // Remove policy from database. + if err := s.DeleteShardGroup("db0", "rp0", 1); err != nil { + t.Fatal(err) + } +} + +// Ensure the store can create a new continuous query. +func TestStore_CreateContinuousQuery(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + // Create query. + if err := s.CreateContinuousQuery("SELECT count() FROM foo"); err != nil { + t.Fatal(err) + } +} + +// Ensure that creating an existing continuous query returns an error. +func TestStore_CreateContinuousQuery_ErrContinuousQueryExists(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + // Create continuous query. + if err := s.CreateContinuousQuery("SELECT count() FROM foo"); err != nil { + t.Fatal(err) + } + + // Create it again. + if err := s.CreateContinuousQuery("SELECT count() FROM foo"); err != meta.ErrContinuousQueryExists { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure the store can delete a continuous query. +func TestStore_DropContinuousQuery(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + // Create queries. + if err := s.CreateContinuousQuery("SELECT count() FROM foo"); err != nil { + t.Fatal(err) + } else if err = s.CreateContinuousQuery("SELECT count() FROM bar"); err != nil { + t.Fatal(err) + } else if err = s.CreateContinuousQuery("SELECT count() FROM baz"); err != nil { + t.Fatal(err) + } + + // Remove one of the queries. + if err := s.DropContinuousQuery("SELECT count() FROM bar"); err != nil { + t.Fatal(err) + } + + // Ensure the resulting queries are correct. + if a, err := s.ContinuousQueries(); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(a, []meta.ContinuousQueryInfo{ + {Query: "SELECT count() FROM foo"}, + {Query: "SELECT count() FROM baz"}, + }) { + t.Fatalf("unexpected queries: %#v", a) + } +} + +// Ensure the store can create a user. +func TestStore_CreateUser(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + // Create user. + if ui, err := s.CreateUser("susy", "pass", true); err != nil { + t.Fatal(err) + } else if ui.Name != "susy" || ui.Hash == "" || ui.Admin != true { + t.Fatalf("unexpected user: %#v", ui) + } +} + +// Ensure the store can remove a user. +func TestStore_DropUser(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + // Create users. + if _, err := s.CreateUser("susy", "pass", true); err != nil { + t.Fatal(err) + } else if _, err := s.CreateUser("bob", "pass", true); err != nil { + t.Fatal(err) + } + + // Remove user. + if err := s.DropUser("bob"); err != nil { + t.Fatal(err) + } + + // Verify user was removed. + if a, err := s.Users(); err != nil { + t.Fatal(err) + } else if len(a) != 1 { + t.Fatalf("unexpected user count: %d", len(a)) + } else if a[0].Name != "susy" { + t.Fatalf("unexpected user: %s", a[0].Name) + } +} + +// Ensure the store can update a user. +func TestStore_UpdateUser(t *testing.T) { + s := MustOpenStore() + defer s.Close() + <-s.LeaderCh() + + // Create users. + if _, err := s.CreateUser("susy", "pass", true); err != nil { + t.Fatal(err) + } else if _, err := s.CreateUser("bob", "pass", true); err != nil { + t.Fatal(err) + } + + // Store password hash for bob. + ui, err := s.User("bob") + if err != nil { + t.Fatal(err) + } + + // Update user. + if err := s.UpdateUser("bob", "XXX"); err != nil { + t.Fatal(err) + } + + // Verify password hash was updated. + if other, err := s.User("bob"); err != nil { + t.Fatal(err) + } else if ui.Hash == other.Hash { + t.Fatal("password hash did not change") + } } // Store is a test wrapper for meta.Store.