fix #176. distribute drop database requests to all nodes in the cluster

pull/175/head
John Shahid 2014-01-09 18:11:17 -05:00
parent de6a3f574e
commit 79fc6fa31b
6 changed files with 155 additions and 42 deletions

View File

@ -39,15 +39,17 @@ var (
// shorter constants for readability
var (
proxyWrite = protocol.Request_PROXY_WRITE
proxyDelete = protocol.Request_PROXY_DELETE
queryRequest = protocol.Request_QUERY
listSeriesRequest = protocol.Request_LIST_SERIES
listSeriesResponse = protocol.Response_LIST_SERIES
endStreamResponse = protocol.Response_END_STREAM
queryResponse = protocol.Response_QUERY
replayReplication = protocol.Request_REPLICATION_REPLAY
sequenceNumber = protocol.Request_SEQUENCE_NUMBER
proxyWrite = protocol.Request_PROXY_WRITE
proxyDelete = protocol.Request_PROXY_DELETE
proxyDropDatabase = protocol.Request_PROXY_DROP_DATABASE
replicateDropDatabase = protocol.Request_REPLICATION_DROP_DATABASE
queryRequest = protocol.Request_QUERY
listSeriesRequest = protocol.Request_LIST_SERIES
listSeriesResponse = protocol.Response_LIST_SERIES
endStreamResponse = protocol.Response_END_STREAM
queryResponse = protocol.Response_QUERY
replayReplication = protocol.Request_REPLICATION_REPLAY
sequenceNumber = protocol.Request_SEQUENCE_NUMBER
)
func NewCoordinatorImpl(datastore datastore.Datastore, raftServer ClusterConsensus, clusterConfiguration *ClusterConfiguration) *CoordinatorImpl {
@ -702,6 +704,38 @@ func (self *CoordinatorImpl) handleSeriesDelete(user common.User, server *Cluste
return self.proxyUntilSuccess(servers, request)
}
func (self *CoordinatorImpl) handleDropDatabase(server *ClusterServer, database string) error {
owner, servers := self.clusterConfiguration.GetReplicas(server, &database)
request := self.createRequest(proxyDropDatabase, &database)
request.OriginatingServerId = &self.clusterConfiguration.localServerId
request.ClusterVersion = &self.clusterConfiguration.ClusterVersion
request.OwnerServerId = &owner.Id
replicationFactor := uint32(self.clusterConfiguration.GetDatabaseReplicationFactor(database))
request.ReplicationFactor = &replicationFactor
if server.Id == self.clusterConfiguration.localServerId {
// this is a local delete
replicationFactor := self.clusterConfiguration.GetReplicationFactor(&database)
err := self.datastore.LogRequestAndAssignSequenceNumber(request, &replicationFactor, &owner.Id)
if err != nil {
return self.proxyUntilSuccess(servers, request)
}
self.datastore.DropDatabase(database)
if err != nil {
log.Error("Couldn't write data to local store: ", err, request)
}
// ignoring the error because we still want to send to replicas
request.Type = &replicateDropDatabase
self.sendRequestToReplicas(request, servers)
return nil
}
// otherwise, proxy the request
return self.proxyUntilSuccess(servers, request)
}
func (self *CoordinatorImpl) writeSeriesToLocalStore(db *string, series *protocol.Series) error {
return self.datastore.WriteSeriesData(*db, series)
}
@ -871,11 +905,26 @@ func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error {
return common.NewAuthorizationError("Insufficient permission to drop database")
}
if self.clusterConfiguration.IsSingleServer() {
if err := self.datastore.DropDatabase(db); err != nil {
return err
}
} else {
servers, _ := self.clusterConfiguration.GetServersToMakeQueryTo(&db)
for _, server := range servers {
if err := self.handleDropDatabase(server.server, db); err != nil {
return err
}
}
}
// don't delete the metadata, we need the replication factor to be
// able to replicate the request properly
if err := self.raftServer.DropDatabase(db); err != nil {
return err
}
return self.datastore.DropDatabase(db)
return nil
}
func (self *CoordinatorImpl) AuthenticateDbUser(db, username, password string) (common.User, error) {

View File

@ -257,8 +257,7 @@ func (self *CoordinatorSuite) TestAutomaticDbCreations(c *C) {
}
// if the db is dropped it should remove the users as well
c.Assert(coordinator.DropDatabase(root, "db1"), IsNil)
c.Assert(coordinator.datastore.(*DatastoreMock).DroppedDatabase, Equals, "db1")
c.Assert(servers[0].DropDatabase("db1"), IsNil)
_, err = coordinator.AuthenticateDbUser("db1", "db_user", "pass")
c.Assert(err, ErrorMatches, ".*Invalid.*")
}

View File

@ -3,7 +3,6 @@ package coordinator
import (
"bytes"
log "code.google.com/p/log4go"
"common"
"datastore"
"encoding/binary"
"errors"
@ -34,12 +33,10 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con
if *request.Type == protocol.Request_PROXY_WRITE {
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}
location := common.RingLocation(request.Database, request.Series.Name, request.Series.Points[0].Timestamp)
ownerId := self.clusterConfig.GetOwnerIdByLocation(&location)
request.OriginatingServerId = &self.clusterConfig.localServerId
// TODO: make request logging and datastore write atomic
replicationFactor := self.clusterConfig.GetReplicationFactor(request.Database)
err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, ownerId)
err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId)
if err != nil {
return err
}
@ -51,6 +48,38 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con
// TODO: add quorum writes?
self.coordinator.ReplicateWrite(request)
return err
} else if *request.Type == protocol.Request_PROXY_DROP_DATABASE {
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}
request.OriginatingServerId = &self.clusterConfig.localServerId
replicationFactor := uint8(*request.ReplicationFactor)
// TODO: make request logging and datastore write atomic
err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId)
if err != nil {
return err
}
err = self.db.DropDatabase(*request.Database)
if err != nil {
return err
}
err = self.WriteResponse(conn, response)
self.coordinator.ReplicateWrite(request)
return err
} else if *request.Type == protocol.Request_REPLICATION_DROP_DATABASE {
replicationFactor := uint8(*request.ReplicationFactor)
// TODO: make request logging and datastore write atomic
err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId)
if err != nil {
switch err := err.(type) {
case datastore.SequenceMissingRequestsError:
log.Warn("Missing sequence number error: Request SN: %v Last Known SN: %v", request.GetSequenceNumber(), err.LastKnownRequestSequence)
go self.coordinator.ReplayReplication(request, &replicationFactor, request.OwnerServerId, &err.LastKnownRequestSequence)
return nil
default:
return err
}
}
return self.db.DropDatabase(*request.Database)
} else if *request.Type == protocol.Request_PROXY_DELETE {
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}

View File

@ -153,10 +153,15 @@ func (self *ServerProcess) Query(database, query string, onlyLocal bool, c *C) *
}
func (self *ServerProcess) Post(url, data string, c *C) *http.Response {
return self.Request("POST", url, data, c)
}
func (self *ServerProcess) Request(method, url, data string, c *C) *http.Response {
fullUrl := fmt.Sprintf("http://localhost:%d%s", self.apiPort, url)
resp, err := http.Post(fullUrl, "application/json", bytes.NewBufferString(data))
req, err := http.NewRequest(method, fullUrl, bytes.NewBufferString(data))
c.Assert(err, IsNil)
resp, err := http.DefaultClient.Do(req)
c.Assert(err, IsNil)
time.Sleep(time.Millisecond * 10)
return resp
}
@ -277,6 +282,29 @@ func (self *ServerSuite) TestListSeries(c *C) {
}
}
func (self *ServerSuite) TestDropDatabase(c *C) {
self.serverProcesses[0].Post("/db?u=root&p=root", `{"name": "drop_db", "replicationFactor": 3}`, c)
self.serverProcesses[0].Post("/db/drop_db/users?u=root&p=root", `{"name": "paul", "password": "pass"}`, c)
data := `[{
"name": "cluster_query",
"columns": ["val1"],
"points": [[1]]
}]`
self.serverProcesses[0].Post("/db/drop_db/series?u=paul&p=pass", data, c)
time.Sleep(time.Second)
resp := self.serverProcesses[0].Request("DELETE", "/db/drop_db?u=root&p=root", "", c)
c.Assert(resp.StatusCode, Equals, http.StatusNoContent)
time.Sleep(time.Second)
self.serverProcesses[0].Post("/db?u=root&p=root", `{"name": "drop_db", "replicationFactor": 3}`, c)
self.serverProcesses[0].Post("/db/drop_db/users?u=root&p=root", `{"name": "paul", "password": "pass"}`, c)
time.Sleep(time.Second)
for _, s := range self.serverProcesses {
fmt.Printf("Running query against: %d\n", s.apiPort)
collection := s.Query("drop_db", "select * from cluster_query", true, c)
c.Assert(collection.GetSeries("cluster_query", c).Points, HasLen, 0)
}
}
func (self *ServerSuite) TestFailureAndReplicationReplays(c *C) {
// write data and confirm that it went to all three servers
data := `

View File

@ -16,35 +16,41 @@ var _ = math.Inf
type Request_Type int32
const (
Request_QUERY Request_Type = 1
Request_REPLICATION_WRITE Request_Type = 2
Request_PROXY_WRITE Request_Type = 3
Request_REPLICATION_DELETE Request_Type = 4
Request_PROXY_DELETE Request_Type = 5
Request_REPLICATION_REPLAY Request_Type = 6
Request_LIST_SERIES Request_Type = 7
Request_SEQUENCE_NUMBER Request_Type = 8
Request_QUERY Request_Type = 1
Request_REPLICATION_WRITE Request_Type = 2
Request_PROXY_WRITE Request_Type = 3
Request_REPLICATION_DELETE Request_Type = 4
Request_PROXY_DELETE Request_Type = 5
Request_REPLICATION_REPLAY Request_Type = 6
Request_LIST_SERIES Request_Type = 7
Request_SEQUENCE_NUMBER Request_Type = 8
Request_PROXY_DROP_DATABASE Request_Type = 9
Request_REPLICATION_DROP_DATABASE Request_Type = 10
)
var Request_Type_name = map[int32]string{
1: "QUERY",
2: "REPLICATION_WRITE",
3: "PROXY_WRITE",
4: "REPLICATION_DELETE",
5: "PROXY_DELETE",
6: "REPLICATION_REPLAY",
7: "LIST_SERIES",
8: "SEQUENCE_NUMBER",
1: "QUERY",
2: "REPLICATION_WRITE",
3: "PROXY_WRITE",
4: "REPLICATION_DELETE",
5: "PROXY_DELETE",
6: "REPLICATION_REPLAY",
7: "LIST_SERIES",
8: "SEQUENCE_NUMBER",
9: "PROXY_DROP_DATABASE",
10: "REPLICATION_DROP_DATABASE",
}
var Request_Type_value = map[string]int32{
"QUERY": 1,
"REPLICATION_WRITE": 2,
"PROXY_WRITE": 3,
"REPLICATION_DELETE": 4,
"PROXY_DELETE": 5,
"REPLICATION_REPLAY": 6,
"LIST_SERIES": 7,
"SEQUENCE_NUMBER": 8,
"QUERY": 1,
"REPLICATION_WRITE": 2,
"PROXY_WRITE": 3,
"REPLICATION_DELETE": 4,
"PROXY_DELETE": 5,
"REPLICATION_REPLAY": 6,
"LIST_SERIES": 7,
"SEQUENCE_NUMBER": 8,
"PROXY_DROP_DATABASE": 9,
"REPLICATION_DROP_DATABASE": 10,
}
func (x Request_Type) Enum() *Request_Type {

View File

@ -34,6 +34,8 @@ message Request {
REPLICATION_REPLAY = 6;
LIST_SERIES = 7;
SEQUENCE_NUMBER = 8;
PROXY_DROP_DATABASE = 9;
REPLICATION_DROP_DATABASE = 10;
}
required uint32 id = 1;
required Type type = 2;