Writes to non replicated shards shouldn't be buffered or use the WAL

Currently all writes (unless synchronous) have to go through the write
buffer and the WAL in order to guarantee proper replication. This commit
changes the behavior for shards that aren't replicated.

Fix #734.
pull/734/head
John Shahid 2014-07-11 12:44:41 -04:00
parent 3fb936f43a
commit 0b6371e431
6 changed files with 50 additions and 8 deletions

View File

@ -20,6 +20,7 @@
- [Issue #731](https://github.com/influxdb/influxdb/issues/731). Don't enable the udp plugin if the `enabled` option is set to false
- [Issue #733](https://github.com/influxdb/influxdb/issues/733). Print an `INFO` message when the input plugin is disabled
- [Issue #707](https://github.com/influxdb/influxdb/issues/707). Graphite input plugin should work payload delimited by any whitespace character
- [Issue #734](https://github.com/influxdb/influxdb/issues/734). Don't buffer non replicated writes
## v0.7.3 [2014-06-13]

View File

@ -36,6 +36,7 @@ type QuerySpec interface {
type WAL interface {
AssignSequenceNumbersAndLog(request *protocol.Request, shard wal.Shard) (uint32, error)
AssignSequenceNumbers(request *protocol.Request) error
Commit(requestNumber uint32, serverId uint32) error
CreateCheckpoint() error
RecoverServerFromRequestNumber(requestNumber uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error

View File

@ -25,8 +25,9 @@ type Shard interface {
StartTime() time.Time
EndTime() time.Time
Write(*p.Request) error
SyncWrite(*p.Request) error
SyncWrite(req *p.Request, assignSeqNum bool) error
Query(querySpec *parser.QuerySpec, response chan *p.Response)
ReplicationFactor() int
IsMicrosecondInRange(t int64) bool
}
@ -160,6 +161,13 @@ func (self *ShardData) SetServers(servers []*ClusterServer) {
self.sortServerIds()
}
func (self *ShardData) ReplicationFactor() int {
if self.store != nil {
return len(self.clusterServers) + 1
}
return len(self.clusterServers)
}
func (self *ShardData) SetLocalStore(store LocalShardStore, localServerId uint32) error {
self.serverIds = append(self.serverIds, localServerId)
self.localServerId = localServerId
@ -192,7 +200,11 @@ func (self *ShardData) DropFields(fields []*metastore.Field) error {
return shard.DropFields(fields)
}
func (self *ShardData) SyncWrite(request *p.Request) error {
func (self *ShardData) SyncWrite(request *p.Request, assignSeqNum bool) error {
if assignSeqNum {
self.wal.AssignSequenceNumbers(request)
}
request.ShardId = &self.id
for _, server := range self.clusterServers {
if err := server.Write(request); err != nil {

View File

@ -725,8 +725,17 @@ func (self *CoordinatorImpl) writeWithoutAssigningId(db string, series []*protoc
s2 := &protocol.Series{Name: s.Name, FieldIds: s.FieldIds, Points: s.Points[l/2:]}
return self.writeWithoutAssigningId(db, []*protocol.Series{s2}, shard, sync)
}
// if we received a synchronous write, then this is coming from the
// continuous queries which have the sequence numbers assigned
if sync {
return shard.SyncWrite(request)
return shard.SyncWrite(request, false)
}
// If the shard isn't replicated do a syncrhonous write
if shard.ReplicationFactor() <= 1 {
// assign sequenceNumber and write synchronously
return shard.SyncWrite(request, true)
}
return shard.Write(request)
}

View File

@ -21,7 +21,8 @@ type commitEntry struct {
}
type appendEntry struct {
confirmation chan *confirmation
request *protocol.Request
shardId uint32
confirmation chan *confirmation
request *protocol.Request
shardId uint32
assignSeqOnly bool
}

View File

@ -292,6 +292,12 @@ func (self *WAL) assignSequenceNumbers(shardId uint32, request *protocol.Request
func (self *WAL) processAppendEntry(e *appendEntry) {
nextRequestNumber := self.state.getNextRequestNumber()
e.request.RequestNumber = proto.Uint32(nextRequestNumber)
self.assignSequenceNumbers(e.shardId, e.request)
if e.assignSeqOnly {
e.confirmation <- &confirmation{e.request.GetRequestNumber(), nil}
return
}
if len(self.logFiles) == 0 {
if _, err := self.createNewLog(nextRequestNumber); err != nil {
@ -302,7 +308,6 @@ func (self *WAL) processAppendEntry(e *appendEntry) {
}
lastLogFile := self.logFiles[len(self.logFiles)-1]
self.assignSequenceNumbers(e.shardId, e.request)
logger.Debug("appending request %d", e.request.GetRequestNumber())
err := lastLogFile.appendRequest(e.request, e.shardId)
if err != nil {
@ -396,7 +401,7 @@ func (self *WAL) openLog(logFileName string) (*log, *index, error) {
// should be marked as committed for each server as it gets confirmed.
func (self *WAL) AssignSequenceNumbersAndLog(request *protocol.Request, shard Shard) (uint32, error) {
confirmationChan := make(chan *confirmation)
self.entries <- &appendEntry{confirmationChan, request, shard.Id()}
self.entries <- &appendEntry{confirmationChan, request, shard.Id(), false}
confirmation := <-confirmationChan
// we should panic if the wal cannot append the request
@ -406,6 +411,19 @@ func (self *WAL) AssignSequenceNumbersAndLog(request *protocol.Request, shard Sh
return confirmation.requestNumber, confirmation.err
}
// Assigns sequence numbers if null.
func (self *WAL) AssignSequenceNumbers(request *protocol.Request) error {
confirmationChan := make(chan *confirmation)
self.entries <- &appendEntry{confirmationChan, request, 0, true}
confirmation := <-confirmationChan
// we should panic if the wal cannot append the request
if confirmation.err != nil {
panic(confirmation.err)
}
return nil
}
// returns the first log file that contains the given request number
func (self *WAL) firstLogFile() int {
for idx, logIndex := range self.logIndex {