From 4acdc7bf34c6350e3d234aadd0e7fcc75285eb83 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Mon, 7 Apr 2014 16:30:52 -0400 Subject: [PATCH] fix #310. Request should support multiple timeseries --- src/api/graphite/api.go | 8 +- src/api/http/api.go | 14 ++-- src/api/http/api_test.go | 6 +- src/cluster/shard.go | 2 +- src/coordinator/client_server_test.go | 4 +- src/coordinator/coordinator.go | 96 ++++++++++++------------ src/coordinator/coordinator_test.go | 2 +- src/coordinator/interface.go | 2 +- src/datastore/leveldb_shard_datastore.go | 8 +- src/protocol/protocol.proto | 2 +- src/wal/wal.go | 28 +++---- src/wal/wal_test.go | 28 ++++--- 12 files changed, 110 insertions(+), 90 deletions(-) diff --git a/src/api/graphite/api.go b/src/api/graphite/api.go index e3ff416a69..13502016fb 100644 --- a/src/api/graphite/api.go +++ b/src/api/graphite/api.go @@ -15,13 +15,14 @@ package graphite import ( "bufio" "cluster" - log "code.google.com/p/log4go" . "common" "configuration" "coordinator" "net" "protocol" "time" + + log "code.google.com/p/log4go" ) type Server struct { @@ -95,13 +96,14 @@ func (self *Server) Close() { } func (self *Server) writePoints(series *protocol.Series) error { - err := self.coordinator.WriteSeriesData(self.user, self.database, series) + serie := []*protocol.Series{series} + err := self.coordinator.WriteSeriesData(self.user, self.database, serie) if err != nil { switch err.(type) { case AuthorizationError: // user information got stale, get a fresh one (this should happen rarely) self.getAuth() - err = self.coordinator.WriteSeriesData(self.user, self.database, series) + err = self.coordinator.WriteSeriesData(self.user, self.database, serie) if err != nil { log.Warn("GraphiteServer: failed to write series after getting new auth: %s\n", err.Error()) } diff --git a/src/api/http/api.go b/src/api/http/api.go index 6fd3912902..7ab163ca7b 100644 --- a/src/api/http/api.go +++ b/src/api/http/api.go @@ -331,6 +331,7 @@ func (self *HttpServer) writePoints(w libhttp.ResponseWriter, r *libhttp.Request } // convert the wire format to the internal representation of the time series + dataStoreSeries := make([]*protocol.Series, 0, len(serializedSeries)) for _, s := range serializedSeries { if len(s.Points) == 0 { continue @@ -341,12 +342,15 @@ func (self *HttpServer) writePoints(w libhttp.ResponseWriter, r *libhttp.Request return libhttp.StatusBadRequest, err.Error() } - err = self.coordinator.WriteSeriesData(user, db, series) - - if err != nil { - return errorToStatusCode(err), err.Error() - } + dataStoreSeries = append(dataStoreSeries, series) } + + err = self.coordinator.WriteSeriesData(user, db, dataStoreSeries) + + if err != nil { + return errorToStatusCode(err), err.Error() + } + return libhttp.StatusOK, nil }) } diff --git a/src/api/http/api_test.go b/src/api/http/api_test.go index 36f6e6e691..af098470f1 100644 --- a/src/api/http/api_test.go +++ b/src/api/http/api_test.go @@ -9,7 +9,6 @@ import ( "encoding/json" "fmt" "io/ioutil" - . "launchpad.net/gocheck" "net" libhttp "net/http" "net/url" @@ -17,6 +16,7 @@ import ( "protocol" "testing" "time" + . "launchpad.net/gocheck" ) // Hook up gocheck into the gotest runner. @@ -101,8 +101,8 @@ type MockCoordinator struct { returnedError error } -func (self *MockCoordinator) WriteSeriesData(_ User, db string, series *protocol.Series) error { - self.series = append(self.series, series) +func (self *MockCoordinator) WriteSeriesData(_ User, db string, series []*protocol.Series) error { + self.series = append(self.series, series...) return nil } diff --git a/src/cluster/shard.go b/src/cluster/shard.go index a1c430ae6e..03012ad8c4 100644 --- a/src/cluster/shard.go +++ b/src/cluster/shard.go @@ -190,7 +190,7 @@ func (self *ShardData) Write(request *p.Request) error { } for _, server := range self.clusterServers { // we have to create a new reqeust object because the ID gets assigned on each server. - requestWithoutId := &p.Request{Type: request.Type, Database: request.Database, Series: request.Series, ShardId: &self.id, RequestNumber: request.RequestNumber} + requestWithoutId := &p.Request{Type: request.Type, Database: request.Database, MultiSeries: request.MultiSeries, ShardId: &self.id, RequestNumber: request.RequestNumber} server.BufferWrite(requestWithoutId) } return nil diff --git a/src/coordinator/client_server_test.go b/src/coordinator/client_server_test.go index f407751440..780b751af5 100644 --- a/src/coordinator/client_server_test.go +++ b/src/coordinator/client_server_test.go @@ -3,10 +3,10 @@ package coordinator import ( "encoding/binary" "fmt" - . "launchpad.net/gocheck" "net" "protocol" "time" + . "launchpad.net/gocheck" ) type ClientServerSuite struct{} @@ -54,7 +54,7 @@ func (self *ClientServerSuite) TestClientCanMakeRequests(c *C) { id := uint32(1) database := "pauldb" proxyWrite := protocol.Request_WRITE - request := &protocol.Request{Id: &id, Type: &proxyWrite, Database: &database, Series: series} + request := &protocol.Request{Id: &id, Type: &proxyWrite, Database: &database, MultiSeries: []*protocol.Series{series}} time.Sleep(time.Second * 1) err := protobufClient.MakeRequest(request, responseStream) diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index c8c673f248..8b5da25520 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -442,20 +442,19 @@ func (self *CoordinatorImpl) ForceCompaction(user common.User) error { return self.raftServer.ForceLogCompaction() } -func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series *protocol.Series) error { +func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series []*protocol.Series) error { if !user.HasWriteAccess(db) { return common.NewAuthorizationError("Insufficient permissions to write to %s", db) } - if len(series.Points) == 0 { - return fmt.Errorf("Can't write series with zero points.") - } err := self.CommitSeriesData(db, series) if err != nil { return err } - self.ProcessContinuousQueries(db, series) + for _, s := range series { + self.ProcessContinuousQueries(db, s) + } return err } @@ -529,7 +528,7 @@ func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string, } newSeries := &protocol.Series{Name: &cleanedTargetName, Fields: series.Fields, Points: []*protocol.Point{point}} - if e := self.CommitSeriesData(db, newSeries); e != nil { + if e := self.CommitSeriesData(db, []*protocol.Series{newSeries}); e != nil { log.Error("Couldn't write data for continuous query: ", e) } } @@ -544,7 +543,7 @@ func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string, } } - if e := self.CommitSeriesData(db, newSeries); e != nil { + if e := self.CommitSeriesData(db, []*protocol.Series{newSeries}); e != nil { log.Error("Couldn't write data for continuous query: ", e) } } @@ -552,69 +551,74 @@ func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string, return nil } -func (self *CoordinatorImpl) CommitSeriesData(db string, series *protocol.Series) error { - lastPointIndex := 0 +func (self *CoordinatorImpl) CommitSeriesData(db string, serieses []*protocol.Series) error { now := common.CurrentTime() - var shardToWrite cluster.Shard - for _, point := range series.Points { - if point.Timestamp == nil { - point.Timestamp = &now + + shardToSerieses := map[uint32]map[string]*protocol.Series{} + shardIdToShard := map[uint32]*cluster.ShardData{} + + for _, series := range serieses { + if len(series.Points) == 0 { + return fmt.Errorf("Can't write series with zero points.") } - } - lastTime := int64(math.MinInt64) - if len(series.Points) > 0 && *series.Points[0].Timestamp == lastTime { - // just a hack to make sure lastTime will never equal the first - // point's timestamp - lastTime = 0 - } + for _, point := range series.Points { + if point.Timestamp == nil { + point.Timestamp = &now + } + } - // sort the points by timestamp - series.SortPointsTimeDescending() + // sort the points by timestamp + series.SortPointsTimeDescending() - for i, point := range series.Points { - if *point.Timestamp != lastTime { - shard, err := self.clusterConfiguration.GetShardToWriteToBySeriesAndTime(db, *series.Name, *point.Timestamp) + for i := 0; i < len(series.Points); { + shard, err := self.clusterConfiguration.GetShardToWriteToBySeriesAndTime(db, series.GetName(), series.Points[i].GetTimestamp()) if err != nil { return err } - if shardToWrite == nil { - shardToWrite = shard - } else if shardToWrite.Id() != shard.Id() { - newIndex := i - newSeries := &protocol.Series{Name: series.Name, Fields: series.Fields, Points: series.Points[lastPointIndex:newIndex]} - if err := self.write(db, newSeries, shardToWrite); err != nil { - return err - } - lastPointIndex = newIndex - shardToWrite = shard + firstIndex := i + timestamp := series.Points[i].GetTimestamp() + for ; i < len(series.Points) && series.Points[i].GetTimestamp() == timestamp; i++ { + // add all points with the same timestamp } - lastTime = *point.Timestamp + newSeries := &protocol.Series{Name: series.Name, Fields: series.Fields, Points: series.Points[firstIndex:i:i]} + + shardIdToShard[shard.Id()] = shard + shardSerieses := shardToSerieses[shard.Id()] + if shardSerieses == nil { + shardSerieses = map[string]*protocol.Series{} + shardToSerieses[shard.Id()] = shardSerieses + } + seriesName := series.GetName() + s := shardSerieses[seriesName] + if s == nil { + shardSerieses[seriesName] = newSeries + continue + } + s.Points = append(s.Points, newSeries.Points...) } } - series.Points = series.Points[lastPointIndex:] + for id, serieses := range shardToSerieses { + shard := shardIdToShard[id] - if len(series.Points) > 0 { - if shardToWrite == nil { - shardToWrite, _ = self.clusterConfiguration.GetShardToWriteToBySeriesAndTime(db, *series.Name, *series.Points[0].Timestamp) + seriesesSlice := make([]*protocol.Series, 0, len(serieses)) + for _, s := range serieses { + seriesesSlice = append(seriesesSlice, s) } - err := self.write(db, series, shardToWrite) - + err := self.write(db, seriesesSlice, shard) if err != nil { log.Error("COORD error writing: ", err) return err } - - return err } return nil } -func (self *CoordinatorImpl) write(db string, series *protocol.Series, shard cluster.Shard) error { - request := &protocol.Request{Type: &write, Database: &db, Series: series} +func (self *CoordinatorImpl) write(db string, series []*protocol.Series, shard cluster.Shard) error { + request := &protocol.Request{Type: &write, Database: &db, MultiSeries: series} return shard.Write(request) } diff --git a/src/coordinator/coordinator_test.go b/src/coordinator/coordinator_test.go index 824223b7f0..7f414beef5 100644 --- a/src/coordinator/coordinator_test.go +++ b/src/coordinator/coordinator_test.go @@ -629,7 +629,7 @@ func (self *CoordinatorSuite) TestCheckReadAccess(c *C) { user := &MockUser{ dbCannotWrite: map[string]bool{"foo": true}, } - err := coordinator.WriteSeriesData(user, "foo", series) + err := coordinator.WriteSeriesData(user, "foo", []*protocol.Series{series}) c.Assert(err, ErrorMatches, ".*Insufficient permission.*") } diff --git a/src/coordinator/interface.go b/src/coordinator/interface.go index 9e4381af85..465bc902e1 100644 --- a/src/coordinator/interface.go +++ b/src/coordinator/interface.go @@ -16,7 +16,7 @@ type Coordinator interface { // for all the data points that are returned // 4. The end of a time series is signaled by returning a series with no data points // 5. TODO: Aggregation on the nodes - WriteSeriesData(user common.User, db string, series *protocol.Series) error + WriteSeriesData(user common.User, db string, series []*protocol.Series) error DropDatabase(user common.User, db string) error CreateDatabase(user common.User, db string, replicationFactor uint8) error ForceCompaction(user common.User) error diff --git a/src/datastore/leveldb_shard_datastore.go b/src/datastore/leveldb_shard_datastore.go index ca5edb38d9..085dd6fa94 100644 --- a/src/datastore/leveldb_shard_datastore.go +++ b/src/datastore/leveldb_shard_datastore.go @@ -161,7 +161,13 @@ func (self *LevelDbShardDatastore) Write(request *protocol.Request) error { return err } defer self.ReturnShard(*request.ShardId) - return shardDb.Write(*request.Database, request.Series) + for _, s := range request.MultiSeries { + err := shardDb.Write(*request.Database, s) + if err != nil { + return err + } + } + return nil } func (self *LevelDbShardDatastore) BufferWrite(request *protocol.Request) { diff --git a/src/protocol/protocol.proto b/src/protocol/protocol.proto index d4ae863e0d..bf78e8d3dc 100644 --- a/src/protocol/protocol.proto +++ b/src/protocol/protocol.proto @@ -35,9 +35,9 @@ message Request { optional uint32 id = 1; required Type type = 2; required string database = 3; - optional Series series = 4; // only write and delete requests get sequenceNumbers assigned. These are used to // ensure that the receiving server is up to date + repeated Series multi_series = 4; optional uint64 sequence_number = 5; optional uint32 shard_id = 6; optional string query = 7; diff --git a/src/wal/wal.go b/src/wal/wal.go index ba5e668c0a..8bf9e9ff9f 100644 --- a/src/wal/wal.go +++ b/src/wal/wal.go @@ -263,18 +263,20 @@ func (self *WAL) processEntries() { } func (self *WAL) assignSequenceNumbers(shardId uint32, request *protocol.Request) { - if request.Series == nil { + if len(request.MultiSeries) == 0 { return } sequenceNumber := self.state.getCurrentSequenceNumber(shardId) - for _, p := range request.Series.Points { - if p.SequenceNumber != nil { - continue + for _, s := range request.MultiSeries { + for _, p := range s.Points { + if p.SequenceNumber != nil { + continue + } + sequenceNumber++ + p.SequenceNumber = proto.Uint64(sequenceNumber*HOST_ID_OFFSET + uint64(self.serverId)) } - sequenceNumber++ - p.SequenceNumber = proto.Uint64(sequenceNumber*HOST_ID_OFFSET + uint64(self.serverId)) + self.state.setCurrentSequenceNumber(shardId, sequenceNumber) } - self.state.setCurrentSequenceNumber(shardId, sequenceNumber) } func (self *WAL) processAppendEntry(e *appendEntry) { @@ -441,13 +443,11 @@ func (self *WAL) recover() error { return err } - var points []*protocol.Point - if s := replayRequest.request.Series; s != nil { - points = s.Points - } - for _, point := range points { - sequenceNumber := (point.GetSequenceNumber() - uint64(self.serverId)) / HOST_ID_OFFSET - self.state.recover(replayRequest.shardId, sequenceNumber) + for _, s := range replayRequest.request.MultiSeries { + for _, point := range s.Points { + sequenceNumber := (point.GetSequenceNumber() - uint64(self.serverId)) / HOST_ID_OFFSET + self.state.recover(replayRequest.shardId, sequenceNumber) + } } if firstOffset == -1 { diff --git a/src/wal/wal_test.go b/src/wal/wal_test.go index 63fc2865cc..95ec3c8f8b 100644 --- a/src/wal/wal_test.go +++ b/src/wal/wal_test.go @@ -51,13 +51,17 @@ func generateSeries(numberOfPoints int) *protocol.Series { } } +func generateSerieses(numberOfPoints int) []*protocol.Series { + return []*protocol.Series{generateSeries(numberOfPoints)} +} + func generateRequest(numberOfPoints int) *protocol.Request { requestType := protocol.Request_WRITE return &protocol.Request{ - Id: proto.Uint32(1), - Database: proto.String("db"), - Type: &requestType, - Series: generateSeries(numberOfPoints), + Id: proto.Uint32(1), + Database: proto.String("db"), + Type: &requestType, + MultiSeries: generateSerieses(numberOfPoints), } } @@ -193,7 +197,7 @@ func (_ *WalSuite) TestSequenceNumberRecovery(c *C) { id, err := wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1}) c.Assert(err, IsNil) c.Assert(id, Equals, uint32(1)) - c.Assert(request.Series.Points[1].GetSequenceNumber(), Equals, 2*HOST_ID_OFFSET+uint64(serverId)) + c.Assert(request.MultiSeries[0].Points[1].GetSequenceNumber(), Equals, 2*HOST_ID_OFFSET+uint64(serverId)) wal.closeWithoutBookmarking() wal, err = NewWAL(wal.config) wal.SetServerId(1) @@ -202,7 +206,7 @@ func (_ *WalSuite) TestSequenceNumberRecovery(c *C) { request = generateRequest(2) id, err = wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1}) c.Assert(err, IsNil) - c.Assert(request.Series.Points[1].GetSequenceNumber(), Equals, 4*HOST_ID_OFFSET+uint64(serverId)) + c.Assert(request.MultiSeries[0].Points[1].GetSequenceNumber(), Equals, 4*HOST_ID_OFFSET+uint64(serverId)) } func (_ *WalSuite) TestAutoBookmarkAfterRecovery(c *C) { @@ -270,7 +274,7 @@ func (_ *WalSuite) TestReplay(c *C) { }) c.Assert(err, IsNil) c.Assert(requests, HasLen, 1) - c.Assert(requests[0].Series.Points, HasLen, 3) + c.Assert(requests[0].MultiSeries[0].Points, HasLen, 3) c.Assert(*requests[0].RequestNumber, Equals, uint32(3)) c.Assert(err, IsNil) } @@ -505,13 +509,13 @@ func (_ *WalSuite) TestSequenceNumberAssignment(c *C) { request := generateRequest(2) _, err := wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1}) c.Assert(err, IsNil) - c.Assert(request.Series.Points[0].GetSequenceNumber(), Equals, uint64(1*HOST_ID_OFFSET+1)) - c.Assert(request.Series.Points[1].GetSequenceNumber(), Equals, uint64(2*HOST_ID_OFFSET+1)) + c.Assert(request.MultiSeries[0].Points[0].GetSequenceNumber(), Equals, uint64(1*HOST_ID_OFFSET+1)) + c.Assert(request.MultiSeries[0].Points[1].GetSequenceNumber(), Equals, uint64(2*HOST_ID_OFFSET+1)) request = generateRequest(2) _, err = wal.AssignSequenceNumbersAndLog(request, &MockShard{id: 1}) c.Assert(err, IsNil) - c.Assert(request.Series.Points[0].GetSequenceNumber(), Equals, uint64(3*HOST_ID_OFFSET+1)) - c.Assert(request.Series.Points[1].GetSequenceNumber(), Equals, uint64(4*HOST_ID_OFFSET+1)) + c.Assert(request.MultiSeries[0].Points[0].GetSequenceNumber(), Equals, uint64(3*HOST_ID_OFFSET+1)) + c.Assert(request.MultiSeries[0].Points[1].GetSequenceNumber(), Equals, uint64(4*HOST_ID_OFFSET+1)) } func (_ *WalSuite) TestSequenceNumberAssignmentPerServer(c *C) { @@ -526,5 +530,5 @@ func (_ *WalSuite) TestSequenceNumberAssignmentPerServer(c *C) { anotherRequest := generateRequest(1) _, err = anotherWal.AssignSequenceNumbersAndLog(anotherRequest, &MockShard{id: 1}) c.Assert(err, IsNil) - c.Assert(request.Series.Points[0].GetSequenceNumber(), Not(Equals), anotherRequest.Series.Points[0].GetSequenceNumber()) + c.Assert(request.MultiSeries[0].Points[0].GetSequenceNumber(), Not(Equals), anotherRequest.MultiSeries[0].Points[0].GetSequenceNumber()) }