Update the coordinator to set sequence number and time

pull/17/head
Paul Dix 2013-10-18 14:53:47 -04:00
parent 2bfbe99018
commit 47d27cc505
4 changed files with 143 additions and 4 deletions

50
src/common/checkers.go Normal file
View File

@ -0,0 +1,50 @@
package common
import (
"fmt"
. "launchpad.net/gocheck"
)
type inRangeChecker struct {
*CheckerInfo
}
var InRange Checker = &inRangeChecker{
&CheckerInfo{Name: "InRange", Params: []string{"obtained", "expected greater than", "expected less than"}},
}
func (checker *inRangeChecker) Check(params []interface{}, names []string) (result bool, error string) {
defer func() {
if v := recover(); v != nil {
result = false
error = fmt.Sprint(v)
}
}()
switch params[0].(type) {
default:
return false, "can't compare range for type"
case int64:
p1 := params[0].(int64)
p2 := params[1].(int64)
p3 := params[2].(int64)
if p2 > p1 {
return false, ""
}
if p3 < p1 {
return false, ""
}
case float64:
p1 := params[0].(float64)
p2 := params[1].(float64)
p3 := params[2].(float64)
if p2 > p1 {
return false, ""
}
if p3 < p1 {
return false, ""
}
}
return true, ""
}

View File

@ -3,6 +3,7 @@ package common
import (
"encoding/json"
"protocol"
"time"
)
func StringToSeriesArray(seriesString string) ([]*protocol.Series, error) {
@ -10,3 +11,7 @@ func StringToSeriesArray(seriesString string) ([]*protocol.Series, error) {
err := json.Unmarshal([]byte(seriesString), &series)
return series, err
}
func CurrentTime() int64 {
return time.Now().UnixNano() / int64(1000)
}

View File

@ -1,19 +1,23 @@
package coordinator
import (
"common"
"datastore"
"parser"
"protocol"
"sync"
)
type CoordinatorImpl struct {
clusterConfiguration *ClusterConfiguration
raftServer *RaftServer
datastore datastore.Datastore
clusterConfiguration *ClusterConfiguration
raftServer *RaftServer
datastore datastore.Datastore
currentSequenceNumber uint32
sequenceNumberLock sync.Mutex
}
func NewCoordinatorImpl(datastore datastore.Datastore, raftServer *RaftServer, clusterConfiguration *ClusterConfiguration) Coordinator {
return &CoordinatorImpl{clusterConfiguration, raftServer, datastore}
return &CoordinatorImpl{clusterConfiguration: clusterConfiguration, raftServer: raftServer, datastore: datastore}
}
func (self *CoordinatorImpl) DistributeQuery(db string, query *parser.Query, yield func(*protocol.Series) error) error {
@ -21,5 +25,22 @@ func (self *CoordinatorImpl) DistributeQuery(db string, query *parser.Query, yie
}
func (self *CoordinatorImpl) WriteSeriesData(db string, series *protocol.Series) error {
now := common.CurrentTime()
for _, p := range series.Points {
if p.Timestamp == nil {
p.Timestamp = &now
self.sequenceNumberLock.Lock()
self.currentSequenceNumber += 1
n := self.currentSequenceNumber
self.sequenceNumberLock.Unlock()
p.SequenceNumber = &n
} else if p.SequenceNumber == nil {
self.sequenceNumberLock.Lock()
self.currentSequenceNumber += 1
n := self.currentSequenceNumber
self.sequenceNumberLock.Unlock()
p.SequenceNumber = &n
}
}
return self.datastore.WriteSeriesData(db, series)
}

View File

@ -1,11 +1,15 @@
package coordinator
import (
. "common"
"datastore"
"encoding/json"
"fmt"
"io/ioutil"
. "launchpad.net/gocheck"
"net/http"
"os"
"protocol"
"strings"
"testing"
"time"
@ -28,6 +32,23 @@ const (
REPLICATION_LAG = time.Millisecond * 500
)
type DatastoreMock struct {
datastore.Datastore
Series *protocol.Series
}
func (self *DatastoreMock) WriteSeriesData(database string, series *protocol.Series) error {
self.Series = series
return nil
}
func stringToSeries(seriesString string, c *C) *protocol.Series {
series := &protocol.Series{}
err := json.Unmarshal([]byte(seriesString), &series)
c.Assert(err, IsNil)
return series
}
func nextPort() int {
nextPortNum += 1
// this is a hack for OSX boxes running spotify. It binds to 127.0.0.1:8099. net.Listen doesn't return an
@ -424,3 +445,45 @@ func (self *CoordinatorSuite) TestCanCreateDatabase(c *C) {
func (self *CoordinatorSuite) TestDistributesRingLocationsToNewServer(c *C) {
}
func (self *CoordinatorSuite) TestWillSetTimestampsAndSequenceNumbersForPointsWithout(c *C) {
datastoreMock := &DatastoreMock{}
coordinator := NewCoordinatorImpl(datastoreMock, nil, nil)
mock := `
{
"points": [
{
"values": [
{
"int64_value": 3
}
],
"sequence_number": 1,
"timestamp": 23423
}
],
"name": "foo",
"fields": [
{
"type": "INT64",
"name": "value"
}
]
}`
series := stringToSeries(mock, c)
coordinator.WriteSeriesData("foo", series)
c.Assert(datastoreMock.Series, DeepEquals, series)
mock = `{
"points": [{"values": [{"int64_value": 3}]}],
"name": "foo",
"fields": [{"type": "INT64","name": "value"}]
}`
series = stringToSeries(mock, c)
beforeTime := CurrentTime()
coordinator.WriteSeriesData("foo", series)
afterTime := CurrentTime()
c.Assert(datastoreMock.Series, Not(DeepEquals), stringToSeries(mock, c))
c.Assert(*datastoreMock.Series.Points[0].SequenceNumber, Equals, uint32(1))
t := *datastoreMock.Series.Points[0].Timestamp
c.Assert(t, InRange, beforeTime, afterTime)
}