Wire up replicated writes, creation of shards across cluster.

pull/249/head
Paul Dix 2014-02-16 10:52:02 -05:00
parent a76c63e389
commit bf139380cb
6 changed files with 69 additions and 9 deletions

View File

@ -79,6 +79,8 @@ type ClusterConfiguration struct {
shardCreator ShardCreator
shardLock sync.Mutex
lastShardId uint32
shardsById map[uint32]*ShardData
shardsByIdLock sync.RWMutex
}
type ContinuousQuery struct {
@ -111,6 +113,7 @@ func NewClusterConfiguration(
longTermShards: make([]*ShardData, 0),
shortTermShards: make([]*ShardData, 0),
random: rand.New(rand.NewSource(time.Now().UnixNano())),
shardsById: make(map[uint32]*ShardData, 0),
}
}
@ -863,8 +866,8 @@ func (self *ClusterConfiguration) AddShards(shards []*NewShardData) ([]*ShardDat
}
for _, newShard := range shards {
self.lastShardId += uint32(1)
shard := NewShard(self.LocalServerId, newShard.StartTime, newShard.EndTime, shardType, self.wal)
id := atomic.AddUint32(&self.lastShardId, uint32(1))
shard := NewShard(id, newShard.StartTime, newShard.EndTime, shardType, self.wal)
servers := make([]*ClusterServer, 0)
for _, serverId := range newShard.ServerIds {
if serverId == self.LocalServerId {
@ -879,6 +882,10 @@ func (self *ClusterConfiguration) AddShards(shards []*NewShardData) ([]*ShardDat
}
shard.SetServers(servers)
self.shardsByIdLock.Lock()
self.shardsById[shard.id] = shard
self.shardsByIdLock.Unlock()
message := "Adding long term shard"
if newShard.Type == LONG_TERM {
self.longTermShards = append(self.longTermShards, shard)
@ -931,3 +938,20 @@ func (self *ClusterConfiguration) MarshalNewShardArrayToShards(newShards []*NewS
fmt.Println("MarshalNewShardArray DONE!")
return shards, nil
}
// This function is for the request handler to get the shard to write a
// request to locally.
func (self *ClusterConfiguration) GetLocalShardById(id uint32) *ShardData {
self.shardsByIdLock.RLock()
defer self.shardsByIdLock.RUnlock()
shard := self.shardsById[id]
// If it's nil it just means that it hasn't been replicated by Raft yet.
// Just create a fake local shard temporarily for the write.
if shard == nil {
shard = NewShard(id, time.Now(), time.Now(), LONG_TERM, self.wal)
shard.SetServers([]*ClusterServer{})
shard.SetLocalStore(self.shardStore, self.LocalServerId)
}
return shard
}

View File

@ -175,6 +175,8 @@ func (self *ShardData) ServerIds() []uint32 {
}
func (self *ShardData) Write(request *protocol.Request) error {
fmt.Println("SHARD Write: ", self.id, request)
request.ShardId = &self.id
requestNumber, err := self.wal.AssignSequenceNumbersAndLog(request, self, self.servers)
if err != nil {
return err
@ -189,6 +191,18 @@ func (self *ShardData) Write(request *protocol.Request) error {
return nil
}
func (self *ShardData) WriteLocalOnly(request *protocol.Request) error {
requestNumber, err := self.wal.AssignSequenceNumbersAndLog(request, self, self.servers)
if err != nil {
return err
}
request.RequestNumber = &requestNumber
if self.store != nil {
self.localWrites <- request
}
return nil
}
func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *protocol.Response) error {
if self.localShard != nil {
processor := engine.NewQueryEngine(querySpec.SelectQuery(), response)
@ -229,6 +243,7 @@ func (self *ShardData) handleWritesToServer(server *ClusterServer, writeBuffer c
server.MakeRequest(request, responseStream)
response := <-responseStream
if *response.Type == protocol.Response_WRITE_OK {
fmt.Println("COMMIT!")
self.wal.Commit(requestNumber, server)
} else {
// TODO: retry logic for failed request

View File

@ -61,7 +61,6 @@ type ShardConfiguration struct {
}
func (self *ShardConfiguration) ParseAndValidate() error {
fmt.Println("ParseAndValidate...")
var err error
if self.Split == 0 {
self.Split = 1
@ -76,12 +75,10 @@ func (self *ShardConfiguration) ParseAndValidate() error {
}
}
if self.Duration == "" {
fmt.Println("ParseAndValidate duration was empty")
self.parsedDuration = time.Hour * 24 * 7
return nil
}
self.parsedDuration, err = time.ParseDuration(self.Duration)
fmt.Println("ParseAndValidate: split, random, duration", self.Split, self.SplitRandom, self.Duration, self.ParsedDuration().Seconds())
return err
}
@ -141,7 +138,6 @@ func LoadConfiguration(fileName string) *Configuration {
}
func parseTomlConfiguration(filename string) (*Configuration, error) {
fmt.Println("parseTomlConfigration: ", filename)
body, err := ioutil.ReadFile(filename)
if err != nil {
return nil, err
@ -151,7 +147,6 @@ func parseTomlConfiguration(filename string) (*Configuration, error) {
if err != nil {
return nil, err
}
fmt.Println("TOML: ", filename)
err = tomlConfiguration.Sharding.LongTerm.ParseAndValidate()
if err != nil {
return nil, err

View File

@ -33,7 +33,17 @@ func NewProtobufRequestHandler(db datastore.Datastore, coordinator Coordinator,
}
func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, conn net.Conn) error {
if *request.Type == protocol.Request_PROXY_WRITE {
if *request.Type == protocol.Request_WRITE {
shard := self.clusterConfig.GetLocalShardById(*request.ShardId)
fmt.Println("HANDLE: ", shard)
err := shard.WriteLocalOnly(request)
if err != nil {
log.Error("ProtobufRequestHandler: error writing local shard: ", err)
return err
}
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}
return self.WriteResponse(conn, response)
} else if *request.Type == protocol.Request_PROXY_WRITE {
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}
request.OriginatingServerId = &self.clusterConfig.LocalServerId

View File

@ -253,6 +253,21 @@ func (self *ServerSuite) TestLimitQueryOnSingleShard(c *C) {
c.Assert(series.GetValueForPointAndColumn(1, "value", c).(float64), Equals, float64(10))
}
func (self *ServerSuite) TestQueryAgainstMultipleShards(c *C) {
data := `[{"points": [[4], [10], [5]], "name": "test_query_against_multiple_shards", "columns": ["value"]}]`
self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c)
t := (time.Now().Unix() - 3600) * 1000
data = fmt.Sprintf(`[{"points": [[2, %d]], "name": "test_query_against_multiple_shards", "columns": ["value", "time"]}]`, t)
self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c)
time.Sleep(time.Second * 2)
collection := self.serverProcesses[0].Query("test_rep", "select count(value) from test_query_against_multiple_shards group by time(1h)", false, c)
c.Assert(collection.Members, HasLen, 1)
series := collection.GetSeries("test_query_against_multiple_shards", c)
c.Assert(series.Points, HasLen, 2)
c.Assert(series.GetValueForPointAndColumn(0, "count", c).(float64), Equals, float64(3))
c.Assert(series.GetValueForPointAndColumn(1, "count", c).(float64), Equals, float64(1))
}
func (self *ServerSuite) TestRestartAfterCompaction(c *C) {
data := `
[{

View File

@ -49,10 +49,11 @@ message Request {
// only write and delete requests get sequenceNumbers assigned. These are used to
// ensure that the receiving server is up to date
optional uint64 sequence_number = 5;
optional uint32 shard_id = 6;
// the originzatingServerId is only used for writes and deletes. It is the id of the
// server that first committed the write to its local datastore. It is used for
// the other servers in the hash ring to ensure they remain consistent.
optional uint32 originating_server_id = 6;
optional uint32 originating_server_id = 21;
optional uint32 cluster_version = 10;
optional string query = 7;
optional string user_name = 8;