fix #310. Request should support multiple timeseries

pull/310/head
John Shahid 2014-04-07 16:30:52 -04:00
parent c116f13aae
commit 4acdc7bf34
12 changed files with 110 additions and 90 deletions

View File

@ -15,13 +15,14 @@ package graphite
import (
"bufio"
"cluster"
log "code.google.com/p/log4go"
. "common"
"configuration"
"coordinator"
"net"
"protocol"
"time"
log "code.google.com/p/log4go"
)
type Server struct {
@ -95,13 +96,14 @@ func (self *Server) Close() {
}
func (self *Server) writePoints(series *protocol.Series) error {
err := self.coordinator.WriteSeriesData(self.user, self.database, series)
serie := []*protocol.Series{series}
err := self.coordinator.WriteSeriesData(self.user, self.database, serie)
if err != nil {
switch err.(type) {
case AuthorizationError:
// user information got stale, get a fresh one (this should happen rarely)
self.getAuth()
err = self.coordinator.WriteSeriesData(self.user, self.database, series)
err = self.coordinator.WriteSeriesData(self.user, self.database, serie)
if err != nil {
log.Warn("GraphiteServer: failed to write series after getting new auth: %s\n", err.Error())
}

View File

@ -331,6 +331,7 @@ func (self *HttpServer) writePoints(w libhttp.ResponseWriter, r *libhttp.Request
}
// convert the wire format to the internal representation of the time series
dataStoreSeries := make([]*protocol.Series, 0, len(serializedSeries))
for _, s := range serializedSeries {
if len(s.Points) == 0 {
continue
@ -341,12 +342,15 @@ func (self *HttpServer) writePoints(w libhttp.ResponseWriter, r *libhttp.Request
return libhttp.StatusBadRequest, err.Error()
}
err = self.coordinator.WriteSeriesData(user, db, series)
dataStoreSeries = append(dataStoreSeries, series)
}
err = self.coordinator.WriteSeriesData(user, db, dataStoreSeries)
if err != nil {
return errorToStatusCode(err), err.Error()
}
}
return libhttp.StatusOK, nil
})
}

View File

@ -9,7 +9,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
. "launchpad.net/gocheck"
"net"
libhttp "net/http"
"net/url"
@ -17,6 +16,7 @@ import (
"protocol"
"testing"
"time"
. "launchpad.net/gocheck"
)
// Hook up gocheck into the gotest runner.
@ -101,8 +101,8 @@ type MockCoordinator struct {
returnedError error
}
func (self *MockCoordinator) WriteSeriesData(_ User, db string, series *protocol.Series) error {
self.series = append(self.series, series)
func (self *MockCoordinator) WriteSeriesData(_ User, db string, series []*protocol.Series) error {
self.series = append(self.series, series...)
return nil
}

View File

@ -190,7 +190,7 @@ func (self *ShardData) Write(request *p.Request) error {
}
for _, server := range self.clusterServers {
// we have to create a new reqeust object because the ID gets assigned on each server.
requestWithoutId := &p.Request{Type: request.Type, Database: request.Database, Series: request.Series, ShardId: &self.id, RequestNumber: request.RequestNumber}
requestWithoutId := &p.Request{Type: request.Type, Database: request.Database, MultiSeries: request.MultiSeries, ShardId: &self.id, RequestNumber: request.RequestNumber}
server.BufferWrite(requestWithoutId)
}
return nil

View File

@ -3,10 +3,10 @@ package coordinator
import (
"encoding/binary"
"fmt"
. "launchpad.net/gocheck"
"net"
"protocol"
"time"
. "launchpad.net/gocheck"
)
type ClientServerSuite struct{}
@ -54,7 +54,7 @@ func (self *ClientServerSuite) TestClientCanMakeRequests(c *C) {
id := uint32(1)
database := "pauldb"
proxyWrite := protocol.Request_WRITE
request := &protocol.Request{Id: &id, Type: &proxyWrite, Database: &database, Series: series}
request := &protocol.Request{Id: &id, Type: &proxyWrite, Database: &database, MultiSeries: []*protocol.Series{series}}
time.Sleep(time.Second * 1)
err := protobufClient.MakeRequest(request, responseStream)

View File

@ -442,20 +442,19 @@ func (self *CoordinatorImpl) ForceCompaction(user common.User) error {
return self.raftServer.ForceLogCompaction()
}
func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series *protocol.Series) error {
func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series []*protocol.Series) error {
if !user.HasWriteAccess(db) {
return common.NewAuthorizationError("Insufficient permissions to write to %s", db)
}
if len(series.Points) == 0 {
return fmt.Errorf("Can't write series with zero points.")
}
err := self.CommitSeriesData(db, series)
if err != nil {
return err
}
self.ProcessContinuousQueries(db, series)
for _, s := range series {
self.ProcessContinuousQueries(db, s)
}
return err
}
@ -529,7 +528,7 @@ func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string,
}
newSeries := &protocol.Series{Name: &cleanedTargetName, Fields: series.Fields, Points: []*protocol.Point{point}}
if e := self.CommitSeriesData(db, newSeries); e != nil {
if e := self.CommitSeriesData(db, []*protocol.Series{newSeries}); e != nil {
log.Error("Couldn't write data for continuous query: ", e)
}
}
@ -544,7 +543,7 @@ func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string,
}
}
if e := self.CommitSeriesData(db, newSeries); e != nil {
if e := self.CommitSeriesData(db, []*protocol.Series{newSeries}); e != nil {
log.Error("Couldn't write data for continuous query: ", e)
}
}
@ -552,69 +551,74 @@ func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string,
return nil
}
func (self *CoordinatorImpl) CommitSeriesData(db string, series *protocol.Series) error {
lastPointIndex := 0
func (self *CoordinatorImpl) CommitSeriesData(db string, serieses []*protocol.Series) error {
now := common.CurrentTime()
var shardToWrite cluster.Shard
shardToSerieses := map[uint32]map[string]*protocol.Series{}
shardIdToShard := map[uint32]*cluster.ShardData{}
for _, series := range serieses {
if len(series.Points) == 0 {
return fmt.Errorf("Can't write series with zero points.")
}
for _, point := range series.Points {
if point.Timestamp == nil {
point.Timestamp = &now
}
}
lastTime := int64(math.MinInt64)
if len(series.Points) > 0 && *series.Points[0].Timestamp == lastTime {
// just a hack to make sure lastTime will never equal the first
// point's timestamp
lastTime = 0
}
// sort the points by timestamp
series.SortPointsTimeDescending()
for i, point := range series.Points {
if *point.Timestamp != lastTime {
shard, err := self.clusterConfiguration.GetShardToWriteToBySeriesAndTime(db, *series.Name, *point.Timestamp)
for i := 0; i < len(series.Points); {
shard, err := self.clusterConfiguration.GetShardToWriteToBySeriesAndTime(db, series.GetName(), series.Points[i].GetTimestamp())
if err != nil {
return err
}
if shardToWrite == nil {
shardToWrite = shard
} else if shardToWrite.Id() != shard.Id() {
newIndex := i
newSeries := &protocol.Series{Name: series.Name, Fields: series.Fields, Points: series.Points[lastPointIndex:newIndex]}
if err := self.write(db, newSeries, shardToWrite); err != nil {
return err
firstIndex := i
timestamp := series.Points[i].GetTimestamp()
for ; i < len(series.Points) && series.Points[i].GetTimestamp() == timestamp; i++ {
// add all points with the same timestamp
}
lastPointIndex = newIndex
shardToWrite = shard
newSeries := &protocol.Series{Name: series.Name, Fields: series.Fields, Points: series.Points[firstIndex:i:i]}
shardIdToShard[shard.Id()] = shard
shardSerieses := shardToSerieses[shard.Id()]
if shardSerieses == nil {
shardSerieses = map[string]*protocol.Series{}
shardToSerieses[shard.Id()] = shardSerieses
}
lastTime = *point.Timestamp
seriesName := series.GetName()
s := shardSerieses[seriesName]
if s == nil {
shardSerieses[seriesName] = newSeries
continue
}
s.Points = append(s.Points, newSeries.Points...)
}
}
series.Points = series.Points[lastPointIndex:]
for id, serieses := range shardToSerieses {
shard := shardIdToShard[id]
if len(series.Points) > 0 {
if shardToWrite == nil {
shardToWrite, _ = self.clusterConfiguration.GetShardToWriteToBySeriesAndTime(db, *series.Name, *series.Points[0].Timestamp)
seriesesSlice := make([]*protocol.Series, 0, len(serieses))
for _, s := range serieses {
seriesesSlice = append(seriesesSlice, s)
}
err := self.write(db, series, shardToWrite)
err := self.write(db, seriesesSlice, shard)
if err != nil {
log.Error("COORD error writing: ", err)
return err
}
return err
}
return nil
}
func (self *CoordinatorImpl) write(db string, series *protocol.Series, shard cluster.Shard) error {
request := &protocol.Request{Type: &write, Database: &db, Series: series}
func (self *CoordinatorImpl) write(db string, series []*protocol.Series, shard cluster.Shard) error {
request := &protocol.Request{Type: &write, Database: &db, MultiSeries: series}
return shard.Write(request)
}

View File

@ -629,7 +629,7 @@ func (self *CoordinatorSuite) TestCheckReadAccess(c *C) {
user := &MockUser{
dbCannotWrite: map[string]bool{"foo": true},
}
err := coordinator.WriteSeriesData(user, "foo", series)
err := coordinator.WriteSeriesData(user, "foo", []*protocol.Series{series})
c.Assert(err, ErrorMatches, ".*Insufficient permission.*")
}

View File

@ -16,7 +16,7 @@ type Coordinator interface {
// for all the data points that are returned
// 4. The end of a time series is signaled by returning a series with no data points
// 5. TODO: Aggregation on the nodes
WriteSeriesData(user common.User, db string, series *protocol.Series) error
WriteSeriesData(user common.User, db string, series []*protocol.Series) error
DropDatabase(user common.User, db string) error
CreateDatabase(user common.User, db string, replicationFactor uint8) error
ForceCompaction(user common.User) error

View File

@ -161,7 +161,13 @@ func (self *LevelDbShardDatastore) Write(request *protocol.Request) error {
return err
}
defer self.ReturnShard(*request.ShardId)
return shardDb.Write(*request.Database, request.Series)
for _, s := range request.MultiSeries {
err := shardDb.Write(*request.Database, s)
if err != nil {
return err
}
}
return nil
}
func (self *LevelDbShardDatastore) BufferWrite(request *protocol.Request) {

View File

@ -35,9 +35,9 @@ message Request {
optional uint32 id = 1;
required Type type = 2;
required string database = 3;
optional Series series = 4;
// only write and delete requests get sequenceNumbers assigned. These are used to
// ensure that the receiving server is up to date
repeated Series multi_series = 4;
optional uint64 sequence_number = 5;
optional uint32 shard_id = 6;
optional string query = 7;

View File

@ -263,11 +263,12 @@ func (self *WAL) processEntries() {
}
func (self *WAL) assignSequenceNumbers(shardId uint32, request *protocol.Request) {
if request.Series == nil {
if len(request.MultiSeries) == 0 {
return
}
sequenceNumber := self.state.getCurrentSequenceNumber(shardId)
for _, p := range request.Series.Points {
for _, s := range request.MultiSeries {
for _, p := range s.Points {
if p.SequenceNumber != nil {
continue
}
@ -276,6 +277,7 @@ func (self *WAL) assignSequenceNumbers(shardId uint32, request *protocol.Request
}
self.state.setCurrentSequenceNumber(shardId, sequenceNumber)
}
}
func (self *WAL) processAppendEntry(e *appendEntry) {
nextRequestNumber := self.state.getNextRequestNumber()
@ -441,14 +443,12 @@ func (self *WAL) recover() error {
return err
}
var points []*protocol.Point
if s := replayRequest.request.Series; s != nil {
points = s.Points
}
for _, point := range points {
for _, s := range replayRequest.request.MultiSeries {
for _, point := range s.Points {
sequenceNumber := (point.GetSequenceNumber() - uint64(self.serverId)) / HOST_ID_OFFSET
self.state.recover(replayRequest.shardId, sequenceNumber)
}
}
if firstOffset == -1 {
firstOffset = replayRequest.startOffset

View File

@ -51,13 +51,17 @@ func generateSeries(numberOfPoints int) *protocol.Series {
}
}
func generateSerieses(numberOfPoints int) []*protocol.Series {
return []*protocol.Series{generateSeries(numberOfPoints)}
}
func generateRequest(numberOfPoints int) *protocol.Request {
requestType := protocol.Request_WRITE
return &protocol.Request{
Id: proto.Uint32(1),
Database: proto.String("db"),
Type: &requestType,
Series: generateSeries(numberOfPoints),
MultiSeries: generateSerieses(numberOfPoints),
}
}
@ -193,7 +197,7 @@ func (_ *WalSuite) TestSequenceNumberRecovery(c *C) {
id, err := wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1})
c.Assert(err, IsNil)
c.Assert(id, Equals, uint32(1))
c.Assert(request.Series.Points[1].GetSequenceNumber(), Equals, 2*HOST_ID_OFFSET+uint64(serverId))
c.Assert(request.MultiSeries[0].Points[1].GetSequenceNumber(), Equals, 2*HOST_ID_OFFSET+uint64(serverId))
wal.closeWithoutBookmarking()
wal, err = NewWAL(wal.config)
wal.SetServerId(1)
@ -202,7 +206,7 @@ func (_ *WalSuite) TestSequenceNumberRecovery(c *C) {
request = generateRequest(2)
id, err = wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1})
c.Assert(err, IsNil)
c.Assert(request.Series.Points[1].GetSequenceNumber(), Equals, 4*HOST_ID_OFFSET+uint64(serverId))
c.Assert(request.MultiSeries[0].Points[1].GetSequenceNumber(), Equals, 4*HOST_ID_OFFSET+uint64(serverId))
}
func (_ *WalSuite) TestAutoBookmarkAfterRecovery(c *C) {
@ -270,7 +274,7 @@ func (_ *WalSuite) TestReplay(c *C) {
})
c.Assert(err, IsNil)
c.Assert(requests, HasLen, 1)
c.Assert(requests[0].Series.Points, HasLen, 3)
c.Assert(requests[0].MultiSeries[0].Points, HasLen, 3)
c.Assert(*requests[0].RequestNumber, Equals, uint32(3))
c.Assert(err, IsNil)
}
@ -505,13 +509,13 @@ func (_ *WalSuite) TestSequenceNumberAssignment(c *C) {
request := generateRequest(2)
_, err := wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1})
c.Assert(err, IsNil)
c.Assert(request.Series.Points[0].GetSequenceNumber(), Equals, uint64(1*HOST_ID_OFFSET+1))
c.Assert(request.Series.Points[1].GetSequenceNumber(), Equals, uint64(2*HOST_ID_OFFSET+1))
c.Assert(request.MultiSeries[0].Points[0].GetSequenceNumber(), Equals, uint64(1*HOST_ID_OFFSET+1))
c.Assert(request.MultiSeries[0].Points[1].GetSequenceNumber(), Equals, uint64(2*HOST_ID_OFFSET+1))
request = generateRequest(2)
_, err = wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1})
c.Assert(err, IsNil)
c.Assert(request.Series.Points[0].GetSequenceNumber(), Equals, uint64(3*HOST_ID_OFFSET+1))
c.Assert(request.Series.Points[1].GetSequenceNumber(), Equals, uint64(4*HOST_ID_OFFSET+1))
c.Assert(request.MultiSeries[0].Points[0].GetSequenceNumber(), Equals, uint64(3*HOST_ID_OFFSET+1))
c.Assert(request.MultiSeries[0].Points[1].GetSequenceNumber(), Equals, uint64(4*HOST_ID_OFFSET+1))
}
func (_ *WalSuite) TestSequenceNumberAssignmentPerServer(c *C) {
@ -526,5 +530,5 @@ func (_ *WalSuite) TestSequenceNumberAssignmentPerServer(c *C) {
anotherRequest := generateRequest(1)
_, err = anotherWal.AssignSequenceNumbersAndLog(anotherRequest, &MockShard{id: 1})
c.Assert(err, IsNil)
c.Assert(request.Series.Points[0].GetSequenceNumber(), Not(Equals), anotherRequest.Series.Points[0].GetSequenceNumber())
c.Assert(request.MultiSeries[0].Points[0].GetSequenceNumber(), Not(Equals), anotherRequest.MultiSeries[0].Points[0].GetSequenceNumber())
}