Change wal.Commit to take serverId
parent
74b151968d
commit
1bd8dc8da0
|
@ -32,7 +32,7 @@ type QuerySpec interface {
|
||||||
|
|
||||||
type WAL interface {
|
type WAL interface {
|
||||||
AssignSequenceNumbersAndLog(request *protocol.Request, shard wal.Shard) (uint32, error)
|
AssignSequenceNumbersAndLog(request *protocol.Request, shard wal.Shard) (uint32, error)
|
||||||
Commit(requestNumber uint32, server wal.Server) error
|
Commit(requestNumber uint32, serverId uint32) error
|
||||||
RecoverServerFromRequestNumber(requestNumber uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error
|
RecoverServerFromRequestNumber(requestNumber uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error
|
||||||
RecoverServerFromLastCommit(serverId uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error
|
RecoverServerFromLastCommit(serverId uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error
|
||||||
}
|
}
|
||||||
|
@ -987,6 +987,7 @@ func (self *ClusterConfiguration) RecoverFromWAL() error {
|
||||||
// }
|
// }
|
||||||
// }
|
// }
|
||||||
// waitForAll.Wait()
|
// waitForAll.Wait()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *ClusterConfiguration) updateOrRemoveShard(shardId uint32, serverIds []uint32) {
|
func (self *ClusterConfiguration) updateOrRemoveShard(shardId uint32, serverIds []uint32) {
|
||||||
|
|
|
@ -311,7 +311,7 @@ func (self *ShardData) logAndHandleDestructiveQuery(querySpec *parser.QuerySpec,
|
||||||
if *res.Type == endStreamResponse {
|
if *res.Type == endStreamResponse {
|
||||||
// don't need to do a commit for the local datastore for now.
|
// don't need to do a commit for the local datastore for now.
|
||||||
if i < len(self.clusterServers) {
|
if i < len(self.clusterServers) {
|
||||||
self.wal.Commit(requestNumber, self.clusterServers[i])
|
self.wal.Commit(requestNumber, self.clusterServers[i].Id)
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -369,7 +369,7 @@ func (self *ShardData) handleWritesToServer(server *ClusterServer, writeBuffer c
|
||||||
response := <-responseStream
|
response := <-responseStream
|
||||||
if *response.Type == protocol.Response_WRITE_OK {
|
if *response.Type == protocol.Response_WRITE_OK {
|
||||||
fmt.Println("COMMIT!")
|
fmt.Println("COMMIT!")
|
||||||
self.wal.Commit(requestNumber, server)
|
self.wal.Commit(requestNumber, server.Id)
|
||||||
} else {
|
} else {
|
||||||
// TODO: retry logic for failed request
|
// TODO: retry logic for failed request
|
||||||
log.Error("REQUEST to server %s failed:: ", server.ProtobufConnectionString, response.GetErrorMessage())
|
log.Error("REQUEST to server %s failed:: ", server.ProtobufConnectionString, response.GetErrorMessage())
|
||||||
|
|
|
@ -76,7 +76,7 @@ type WALMock struct {
|
||||||
func (self *WALMock) AssignSequenceNumbersAndLog(request *protocol.Request, shard wal.Shard, servers []wal.Server) (uint32, error) {
|
func (self *WALMock) AssignSequenceNumbersAndLog(request *protocol.Request, shard wal.Shard, servers []wal.Server) (uint32, error) {
|
||||||
return uint32(1), nil
|
return uint32(1), nil
|
||||||
}
|
}
|
||||||
func (self *WALMock) Commit(requestNumber uint32, server wal.Server) error {
|
func (self *WALMock) Commit(requestNumber uint32, serverId uint32) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (self *WALMock) RecoverFromLog(yield func(request *protocol.Request, shard wal.Shard, server wal.Server) error) error {
|
func (self *WALMock) RecoverFromLog(yield func(request *protocol.Request, shard wal.Shard, server wal.Server) error) error {
|
||||||
|
|
|
@ -80,9 +80,9 @@ func (self *WAL) SetServerId(id uint32) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Marks a given request for a given server as committed
|
// Marks a given request for a given server as committed
|
||||||
func (self *WAL) Commit(requestNumber uint32, server Server) error {
|
func (self *WAL) Commit(requestNumber uint32, serverId uint32) error {
|
||||||
lastLogFile := self.logFiles[len(self.logFiles)-1]
|
lastLogFile := self.logFiles[len(self.logFiles)-1]
|
||||||
lastLogFile.state.commitRequestNumber(server.GetId(), requestNumber)
|
lastLogFile.state.commitRequestNumber(serverId, requestNumber)
|
||||||
lowestCommitedRequestNumber := lastLogFile.state.LowestCommitedRequestNumber()
|
lowestCommitedRequestNumber := lastLogFile.state.LowestCommitedRequestNumber()
|
||||||
|
|
||||||
index := self.firstLogFile(lowestCommitedRequestNumber)
|
index := self.firstLogFile(lowestCommitedRequestNumber)
|
||||||
|
|
Loading…
Reference in New Issue