assign sequence numbers to points.

pull/269/head
John Shahid 2014-02-12 05:02:37 -05:00
parent 3ad2a5edf2
commit 498e09162a
2 changed files with 27 additions and 4 deletions

View File

@ -1,6 +1,7 @@
package wal
import (
"code.google.com/p/goprotobuf/proto"
"fmt"
"io"
"os"
@ -19,7 +20,7 @@ func newLog(file *os.File) (*log, error) {
l := &log{
entries: make(chan *entry, 10),
file: file,
state: &state{},
state: newState(),
}
l.recover()
@ -35,10 +36,23 @@ func (self *log) setServerId(serverId uint32) {
self.serverId = serverId
}
func (self *log) assignSequenceNumbers(shardId uint32, request *protocol.Request) {
sequenceNumber := self.state.getCurrentSequenceNumber(shardId)
for _, p := range request.Series.Points {
if p.SequenceNumber != nil {
continue
}
sequenceNumber++
p.SequenceNumber = proto.Uint64(sequenceNumber)
}
self.state.setCurrentSequenceNumber(shardId, sequenceNumber)
}
func (self *log) processEntries() {
for {
select {
case x := <-self.entries:
self.assignSequenceNumbers(x.shardId, x.request)
bytes, err := x.request.Encode()
if err != nil {
x.confirmation <- &confirmation{0, err}

View File

@ -7,20 +7,29 @@ const (
type state struct {
version byte
currentRequestNumber uint32
shardLastSequenceNumber map[uint32]uint32
shardLastSequenceNumber map[uint32]uint64
serverLastRequestNumber map[uint32]uint32
}
func newState() *state {
return &state{
version: CURRENT_VERSION,
currentRequestNumber: 0,
shardLastSequenceNumber: make(map[uint32]uint64),
serverLastRequestNumber: make(map[uint32]uint32),
}
}
func (self *state) getNextRequestNumber() uint32 {
self.currentRequestNumber++
return self.currentRequestNumber
}
func (self *state) getCurrentSequenceNumber(shardId uint32) uint32 {
func (self *state) getCurrentSequenceNumber(shardId uint32) uint64 {
return self.shardLastSequenceNumber[shardId]
}
func (self *state) setCurrentSequenceNumber(shardId, sequenceNumber uint32) {
func (self *state) setCurrentSequenceNumber(shardId uint32, sequenceNumber uint64) {
self.shardLastSequenceNumber[shardId] = sequenceNumber
}