The lock in coordinator.handleClusterWrite ensures that requests get sent to replicas in sequence.pull/173/head
parent
aa3443322a
commit
62e420ee8b
src
coordinator
datastore
|
@ -174,6 +174,7 @@
|
|||
- [Issue #117](https://github.com/influxdb/influxdb/issues/117). Fill empty groups with default values
|
||||
- [Issue #150](https://github.com/influxdb/influxdb/pull/150). Fix parser for when multiple divisions look like a regex.
|
||||
- [Issue #158](https://github.com/influxdb/influxdb/issues/158). Logged deletes should be stored with the time range if missing.
|
||||
- [Issue #136](https://github.com/influxdb/influxdb/issues/136). Make sure writes are replicated in order to avoid triggering replays
|
||||
|
||||
### Deprecated
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ type CoordinatorImpl struct {
|
|||
requestId uint32
|
||||
runningReplays map[string][]*protocol.Request
|
||||
runningReplaysLock sync.Mutex
|
||||
writeLock sync.Mutex
|
||||
}
|
||||
|
||||
// this is the key used for the persistent atomic ints for sequence numbers
|
||||
|
@ -501,7 +502,7 @@ func (self *CoordinatorImpl) getCurrentSequenceNumber(replicationFactor uint8, o
|
|||
}
|
||||
|
||||
func (self *CoordinatorImpl) ReplayReplication(request *protocol.Request, replicationFactor *uint8, owningServerId *uint32, lastSeenSequenceNumber *uint64) {
|
||||
log.Warn("COORDINATOR: ReplayReplication: %v, %v, %v, %v", request, *replicationFactor, *owningServerId, *lastSeenSequenceNumber)
|
||||
log.Warn("COORDINATOR: ReplayReplication: SN: %d, LS: %d, RF: %d, OS: %d", *request.SequenceNumber, *lastSeenSequenceNumber, *replicationFactor, *owningServerId)
|
||||
key := fmt.Sprintf("%d_%d_%d_%d", *replicationFactor, *request.ClusterVersion, *request.OriginatingServerId, *owningServerId)
|
||||
self.runningReplaysLock.Lock()
|
||||
requestsWaitingToWrite := self.runningReplays[key]
|
||||
|
@ -536,7 +537,7 @@ func (self *CoordinatorImpl) ReplayReplication(request *protocol.Request, replic
|
|||
server := self.clusterConfiguration.GetServerById(request.OriginatingServerId)
|
||||
err := server.MakeRequest(replayRequest, replayedRequests)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
log.Error("REPLAY ERROR: ", err)
|
||||
return
|
||||
}
|
||||
for {
|
||||
|
@ -706,6 +707,12 @@ func (self *CoordinatorImpl) writeSeriesToLocalStore(db *string, series *protoco
|
|||
}
|
||||
|
||||
func (self *CoordinatorImpl) handleClusterWrite(serverIndex *int, db *string, series *protocol.Series) error {
|
||||
// TODO: Figure out how to not need this lock. Shouldn't have to lock to send on a connection. However,
|
||||
// when the server is under load, replication requests can get out of order, which triggers a replay.
|
||||
// Maybe we need a special channel for replication?
|
||||
self.writeLock.Lock()
|
||||
defer self.writeLock.Unlock()
|
||||
|
||||
owner, servers := self.clusterConfiguration.GetServersByIndexAndReplicationFactor(db, serverIndex)
|
||||
|
||||
request := self.createRequest(proxyWrite, db)
|
||||
|
@ -759,7 +766,11 @@ func (self *CoordinatorImpl) proxyWrite(clusterServer *ClusterServer, request *p
|
|||
defer func() { request.OriginatingServerId = originatingServerId }()
|
||||
|
||||
responseChan := make(chan *protocol.Response, 1)
|
||||
clusterServer.MakeRequest(request, responseChan)
|
||||
err := clusterServer.MakeRequest(request, responseChan)
|
||||
if err != nil {
|
||||
log.Warn("PROXY WRITE ERROR: ", err)
|
||||
return err
|
||||
}
|
||||
response := <-responseChan
|
||||
if *response.Type == protocol.Response_WRITE_OK {
|
||||
return nil
|
||||
|
@ -1052,7 +1063,10 @@ func (self *CoordinatorImpl) ReplicateDelete(request *protocol.Request) error {
|
|||
func (self *CoordinatorImpl) sendRequestToReplicas(request *protocol.Request, replicas []*ClusterServer) {
|
||||
for _, server := range replicas {
|
||||
if server.Id != self.clusterConfiguration.localServerId {
|
||||
server.MakeRequest(request, nil)
|
||||
err := server.MakeRequest(request, nil)
|
||||
if err != nil {
|
||||
log.Warn("REPLICATION ERROR: ", request.GetSequenceNumber(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con
|
|||
if err != nil {
|
||||
switch err := err.(type) {
|
||||
case datastore.SequenceMissingRequestsError:
|
||||
log.Warn("Missing sequence number error: %v", err)
|
||||
log.Warn("Missing sequence number error: Request SN: %v Last Known SN: %v", request.GetSequenceNumber(), err.LastKnownRequestSequence)
|
||||
go self.coordinator.ReplayReplication(request, &replicationFactor, request.OwnerServerId, &err.LastKnownRequestSequence)
|
||||
return nil
|
||||
default:
|
||||
|
|
|
@ -501,7 +501,9 @@ func (self *LevelDbDatastore) LogRequestAndAssignSequenceNumber(request *protoco
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if previousSequenceNumber+uint64(1) != *request.SequenceNumber {
|
||||
// Do a less than comparison because it's ok if we're just getting the same write again. As long as we haven't missed one.
|
||||
if previousSequenceNumber+uint64(1) < *request.SequenceNumber {
|
||||
log.Warn("MISSING REQUESTS: ", previousSequenceNumber, *request.SequenceNumber)
|
||||
return SequenceMissingRequestsError{"Missing requests between last seen and this one.", previousSequenceNumber, *request.SequenceNumber}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue