add a way to tell if the cluster is in sync or not
parent
d199866ae4
commit
69c19018e2
|
@ -138,6 +138,9 @@ func (self *HttpServer) Serve(listener net.Listener) {
|
|||
self.registerEndpoint(p, "get", "/cluster/shards", self.getShards)
|
||||
self.registerEndpoint(p, "del", "/cluster/shards/:id", self.dropShard)
|
||||
|
||||
// return whether the cluster is in sync or not
|
||||
self.registerEndpoint(p, "get", "/sync", self.isInSync)
|
||||
|
||||
if listener == nil {
|
||||
self.startSsl(p)
|
||||
return
|
||||
|
@ -950,6 +953,22 @@ func (self *HttpServer) getShards(w libhttp.ResponseWriter, r *libhttp.Request)
|
|||
})
|
||||
}
|
||||
|
||||
// Note: this is meant for testing purposes only and doesn't guarantee
|
||||
// data integrity and shouldn't be used in client code.
|
||||
func (self *HttpServer) isInSync(w libhttp.ResponseWriter, r *libhttp.Request) {
|
||||
self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) {
|
||||
if self.clusterConfig.HasUncommitedWrites() {
|
||||
return 500, "false"
|
||||
}
|
||||
|
||||
if !self.raftServer.CommittedAllChanges() {
|
||||
return 500, "false"
|
||||
}
|
||||
|
||||
return 200, "true"
|
||||
})
|
||||
}
|
||||
|
||||
func (self *HttpServer) dropShard(w libhttp.ResponseWriter, r *libhttp.Request) {
|
||||
self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) {
|
||||
id, err := strconv.ParseInt(r.URL.Query().Get(":id"), 10, 64)
|
||||
|
|
|
@ -82,6 +82,7 @@ type ClusterConfiguration struct {
|
|||
shardsById map[uint32]*ShardData
|
||||
shardsByIdLock sync.RWMutex
|
||||
LocalRaftName string
|
||||
writeBuffers []*WriteBuffer
|
||||
}
|
||||
|
||||
type ContinuousQuery struct {
|
||||
|
@ -198,6 +199,16 @@ func (self *ClusterConfiguration) GetServerByProtobufConnectionString(connection
|
|||
return nil
|
||||
}
|
||||
|
||||
// Return per shard request numbers for the local server and all remote servers
|
||||
func (self *ClusterConfiguration) HasUncommitedWrites() bool {
|
||||
for _, buffer := range self.writeBuffers {
|
||||
if buffer.HasUncommitedWrites() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) {
|
||||
self.serversLock.Lock()
|
||||
defer self.serversLock.Unlock()
|
||||
|
@ -212,7 +223,9 @@ func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) {
|
|||
server.connection = self.connectionCreator(server.ProtobufConnectionString)
|
||||
server.Connect()
|
||||
}
|
||||
server.SetWriteBuffer(NewWriteBuffer(fmt.Sprintf("%d", server.GetId()), server, self.wal, server.Id, self.config.PerServerWriteBufferSize))
|
||||
writeBuffer := NewWriteBuffer(fmt.Sprintf("%d", server.GetId()), server, self.wal, server.Id, self.config.PerServerWriteBufferSize)
|
||||
self.writeBuffers = append(self.writeBuffers, writeBuffer)
|
||||
server.SetWriteBuffer(writeBuffer)
|
||||
server.StartHeartbeat()
|
||||
} else if !self.addedLocalServer {
|
||||
log.Info("Added the local server")
|
||||
|
@ -509,7 +522,9 @@ func (self *ClusterConfiguration) Recovery(b []byte) error {
|
|||
if server.connection == nil {
|
||||
server.connection = self.connectionCreator(server.ProtobufConnectionString)
|
||||
if server.ProtobufConnectionString != self.config.ProtobufConnectionString() {
|
||||
server.SetWriteBuffer(NewWriteBuffer(fmt.Sprintf("server: %d", server.GetId()), server, self.wal, server.Id, self.config.PerServerWriteBufferSize))
|
||||
writeBuffer := NewWriteBuffer(fmt.Sprintf("server: %d", server.GetId()), server, self.wal, server.Id, self.config.PerServerWriteBufferSize)
|
||||
self.writeBuffers = append(self.writeBuffers, writeBuffer)
|
||||
server.SetWriteBuffer(writeBuffer)
|
||||
server.Connect()
|
||||
server.StartHeartbeat()
|
||||
}
|
||||
|
@ -927,7 +942,9 @@ func (self *ClusterConfiguration) DropShard(shardId uint32, serverIds []uint32)
|
|||
}
|
||||
|
||||
func (self *ClusterConfiguration) RecoverFromWAL() error {
|
||||
self.shardStore.SetWriteBuffer(NewWriteBuffer("local", self.shardStore, self.wal, self.LocalServerId, self.config.LocalStoreWriteBufferSize))
|
||||
writeBuffer := NewWriteBuffer("local", self.shardStore, self.wal, self.LocalServerId, self.config.LocalStoreWriteBufferSize)
|
||||
self.writeBuffers = append(self.writeBuffers, writeBuffer)
|
||||
self.shardStore.SetWriteBuffer(writeBuffer)
|
||||
var waitForAll sync.WaitGroup
|
||||
for _, _server := range self.servers {
|
||||
server := _server
|
||||
|
|
|
@ -2,6 +2,7 @@ package cluster
|
|||
|
||||
import (
|
||||
"protocol"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
log "code.google.com/p/log4go"
|
||||
|
@ -9,14 +10,16 @@ import (
|
|||
|
||||
// Acts as a buffer for writes
|
||||
type WriteBuffer struct {
|
||||
writer Writer
|
||||
wal WAL
|
||||
serverId uint32
|
||||
writes chan *protocol.Request
|
||||
stoppedWrites chan uint32
|
||||
bufferSize int
|
||||
shardIds map[uint32]bool
|
||||
writerInfo string
|
||||
writer Writer
|
||||
wal WAL
|
||||
serverId uint32
|
||||
writes chan *protocol.Request
|
||||
stoppedWrites chan uint32
|
||||
bufferSize int
|
||||
shardIds map[uint32]bool
|
||||
shardLastRequestNumber map[uint32]uint32
|
||||
shardCommitedRequestNumber map[uint32]uint32
|
||||
writerInfo string
|
||||
}
|
||||
|
||||
type Writer interface {
|
||||
|
@ -26,22 +29,33 @@ type Writer interface {
|
|||
func NewWriteBuffer(writerInfo string, writer Writer, wal WAL, serverId uint32, bufferSize int) *WriteBuffer {
|
||||
log.Info("%s: Initializing write buffer with buffer size of %d", writerInfo, bufferSize)
|
||||
buff := &WriteBuffer{
|
||||
writer: writer,
|
||||
wal: wal,
|
||||
serverId: serverId,
|
||||
writes: make(chan *protocol.Request, bufferSize),
|
||||
stoppedWrites: make(chan uint32, 1),
|
||||
bufferSize: bufferSize,
|
||||
shardIds: make(map[uint32]bool),
|
||||
writerInfo: writerInfo,
|
||||
writer: writer,
|
||||
wal: wal,
|
||||
serverId: serverId,
|
||||
writes: make(chan *protocol.Request, bufferSize),
|
||||
stoppedWrites: make(chan uint32, 1),
|
||||
bufferSize: bufferSize,
|
||||
shardIds: make(map[uint32]bool),
|
||||
shardLastRequestNumber: map[uint32]uint32{},
|
||||
shardCommitedRequestNumber: map[uint32]uint32{},
|
||||
writerInfo: writerInfo,
|
||||
}
|
||||
go buff.handleWrites()
|
||||
return buff
|
||||
}
|
||||
|
||||
func (self *WriteBuffer) ShardsRequestNumber() map[uint32]uint32 {
|
||||
return self.shardLastRequestNumber
|
||||
}
|
||||
|
||||
func (self *WriteBuffer) HasUncommitedWrites() bool {
|
||||
return !reflect.DeepEqual(self.shardCommitedRequestNumber, self.shardLastRequestNumber)
|
||||
}
|
||||
|
||||
// This method never blocks. It'll buffer writes until they fill the buffer then drop the on the
|
||||
// floor and let the background goroutine replay from the WAL
|
||||
func (self *WriteBuffer) Write(request *protocol.Request) {
|
||||
self.shardLastRequestNumber[request.GetShardId()] = request.GetRequestNumber()
|
||||
select {
|
||||
case self.writes <- request:
|
||||
return
|
||||
|
@ -73,6 +87,7 @@ func (self *WriteBuffer) write(request *protocol.Request) {
|
|||
requestNumber := *request.RequestNumber
|
||||
err := self.writer.Write(request)
|
||||
if err == nil {
|
||||
self.shardCommitedRequestNumber[request.GetShardId()] = request.GetRequestNumber()
|
||||
self.wal.Commit(requestNumber, self.serverId)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -318,6 +318,12 @@ func (s *RaftServer) CompactLog() {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *RaftServer) CommittedAllChanges() bool {
|
||||
entries := s.raftServer.LogEntries()
|
||||
lastIndex := entries[len(entries)-1].Index()
|
||||
return s.raftServer.CommitIndex() == lastIndex
|
||||
}
|
||||
|
||||
func (s *RaftServer) startRaft() error {
|
||||
log.Info("Initializing Raft Server: %s %d", s.path, s.port)
|
||||
|
||||
|
|
Loading…
Reference in New Issue