diff --git a/src/cluster/cluster_configuration.go b/src/cluster/cluster_configuration.go index 082a3e546c..0726007d06 100644 --- a/src/cluster/cluster_configuration.go +++ b/src/cluster/cluster_configuration.go @@ -79,6 +79,8 @@ type ClusterConfiguration struct { shardCreator ShardCreator shardLock sync.Mutex lastShardId uint32 + shardsById map[uint32]*ShardData + shardsByIdLock sync.RWMutex } type ContinuousQuery struct { @@ -111,6 +113,7 @@ func NewClusterConfiguration( longTermShards: make([]*ShardData, 0), shortTermShards: make([]*ShardData, 0), random: rand.New(rand.NewSource(time.Now().UnixNano())), + shardsById: make(map[uint32]*ShardData, 0), } } @@ -863,8 +866,8 @@ func (self *ClusterConfiguration) AddShards(shards []*NewShardData) ([]*ShardDat } for _, newShard := range shards { - self.lastShardId += uint32(1) - shard := NewShard(self.LocalServerId, newShard.StartTime, newShard.EndTime, shardType, self.wal) + id := atomic.AddUint32(&self.lastShardId, uint32(1)) + shard := NewShard(id, newShard.StartTime, newShard.EndTime, shardType, self.wal) servers := make([]*ClusterServer, 0) for _, serverId := range newShard.ServerIds { if serverId == self.LocalServerId { @@ -879,6 +882,10 @@ func (self *ClusterConfiguration) AddShards(shards []*NewShardData) ([]*ShardDat } shard.SetServers(servers) + self.shardsByIdLock.Lock() + self.shardsById[shard.id] = shard + self.shardsByIdLock.Unlock() + message := "Adding long term shard" if newShard.Type == LONG_TERM { self.longTermShards = append(self.longTermShards, shard) @@ -931,3 +938,20 @@ func (self *ClusterConfiguration) MarshalNewShardArrayToShards(newShards []*NewS fmt.Println("MarshalNewShardArray DONE!") return shards, nil } + +// This function is for the request handler to get the shard to write a +// request to locally. +func (self *ClusterConfiguration) GetLocalShardById(id uint32) *ShardData { + self.shardsByIdLock.RLock() + defer self.shardsByIdLock.RUnlock() + shard := self.shardsById[id] + + // If it's nil it just means that it hasn't been replicated by Raft yet. + // Just create a fake local shard temporarily for the write. + if shard == nil { + shard = NewShard(id, time.Now(), time.Now(), LONG_TERM, self.wal) + shard.SetServers([]*ClusterServer{}) + shard.SetLocalStore(self.shardStore, self.LocalServerId) + } + return shard +} diff --git a/src/cluster/shard.go b/src/cluster/shard.go index 901af60271..8b3e21489c 100644 --- a/src/cluster/shard.go +++ b/src/cluster/shard.go @@ -175,6 +175,8 @@ func (self *ShardData) ServerIds() []uint32 { } func (self *ShardData) Write(request *protocol.Request) error { + fmt.Println("SHARD Write: ", self.id, request) + request.ShardId = &self.id requestNumber, err := self.wal.AssignSequenceNumbersAndLog(request, self, self.servers) if err != nil { return err @@ -189,6 +191,18 @@ func (self *ShardData) Write(request *protocol.Request) error { return nil } +func (self *ShardData) WriteLocalOnly(request *protocol.Request) error { + requestNumber, err := self.wal.AssignSequenceNumbersAndLog(request, self, self.servers) + if err != nil { + return err + } + request.RequestNumber = &requestNumber + if self.store != nil { + self.localWrites <- request + } + return nil +} + func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *protocol.Response) error { if self.localShard != nil { processor := engine.NewQueryEngine(querySpec.SelectQuery(), response) @@ -229,6 +243,7 @@ func (self *ShardData) handleWritesToServer(server *ClusterServer, writeBuffer c server.MakeRequest(request, responseStream) response := <-responseStream if *response.Type == protocol.Response_WRITE_OK { + fmt.Println("COMMIT!") self.wal.Commit(requestNumber, server) } else { // TODO: retry logic for failed request diff --git a/src/configuration/configuration.go b/src/configuration/configuration.go index 66c38afae0..0d8f16b7e7 100644 --- a/src/configuration/configuration.go +++ b/src/configuration/configuration.go @@ -61,7 +61,6 @@ type ShardConfiguration struct { } func (self *ShardConfiguration) ParseAndValidate() error { - fmt.Println("ParseAndValidate...") var err error if self.Split == 0 { self.Split = 1 @@ -76,12 +75,10 @@ func (self *ShardConfiguration) ParseAndValidate() error { } } if self.Duration == "" { - fmt.Println("ParseAndValidate duration was empty") self.parsedDuration = time.Hour * 24 * 7 return nil } self.parsedDuration, err = time.ParseDuration(self.Duration) - fmt.Println("ParseAndValidate: split, random, duration", self.Split, self.SplitRandom, self.Duration, self.ParsedDuration().Seconds()) return err } @@ -141,7 +138,6 @@ func LoadConfiguration(fileName string) *Configuration { } func parseTomlConfiguration(filename string) (*Configuration, error) { - fmt.Println("parseTomlConfigration: ", filename) body, err := ioutil.ReadFile(filename) if err != nil { return nil, err @@ -151,7 +147,6 @@ func parseTomlConfiguration(filename string) (*Configuration, error) { if err != nil { return nil, err } - fmt.Println("TOML: ", filename) err = tomlConfiguration.Sharding.LongTerm.ParseAndValidate() if err != nil { return nil, err diff --git a/src/coordinator/protobuf_request_handler.go b/src/coordinator/protobuf_request_handler.go index c60ce80572..ee5a19bb3a 100644 --- a/src/coordinator/protobuf_request_handler.go +++ b/src/coordinator/protobuf_request_handler.go @@ -33,7 +33,17 @@ func NewProtobufRequestHandler(db datastore.Datastore, coordinator Coordinator, } func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, conn net.Conn) error { - if *request.Type == protocol.Request_PROXY_WRITE { + if *request.Type == protocol.Request_WRITE { + shard := self.clusterConfig.GetLocalShardById(*request.ShardId) + fmt.Println("HANDLE: ", shard) + err := shard.WriteLocalOnly(request) + if err != nil { + log.Error("ProtobufRequestHandler: error writing local shard: ", err) + return err + } + response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk} + return self.WriteResponse(conn, response) + } else if *request.Type == protocol.Request_PROXY_WRITE { response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk} request.OriginatingServerId = &self.clusterConfig.LocalServerId diff --git a/src/integration/server_test.go b/src/integration/server_test.go index 43560a9b5c..44fa7a7982 100644 --- a/src/integration/server_test.go +++ b/src/integration/server_test.go @@ -253,6 +253,21 @@ func (self *ServerSuite) TestLimitQueryOnSingleShard(c *C) { c.Assert(series.GetValueForPointAndColumn(1, "value", c).(float64), Equals, float64(10)) } +func (self *ServerSuite) TestQueryAgainstMultipleShards(c *C) { + data := `[{"points": [[4], [10], [5]], "name": "test_query_against_multiple_shards", "columns": ["value"]}]` + self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c) + t := (time.Now().Unix() - 3600) * 1000 + data = fmt.Sprintf(`[{"points": [[2, %d]], "name": "test_query_against_multiple_shards", "columns": ["value", "time"]}]`, t) + self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c) + time.Sleep(time.Second * 2) + collection := self.serverProcesses[0].Query("test_rep", "select count(value) from test_query_against_multiple_shards group by time(1h)", false, c) + c.Assert(collection.Members, HasLen, 1) + series := collection.GetSeries("test_query_against_multiple_shards", c) + c.Assert(series.Points, HasLen, 2) + c.Assert(series.GetValueForPointAndColumn(0, "count", c).(float64), Equals, float64(3)) + c.Assert(series.GetValueForPointAndColumn(1, "count", c).(float64), Equals, float64(1)) +} + func (self *ServerSuite) TestRestartAfterCompaction(c *C) { data := ` [{ diff --git a/src/protocol/protocol.proto b/src/protocol/protocol.proto index 8bfe675c19..6bd870a373 100644 --- a/src/protocol/protocol.proto +++ b/src/protocol/protocol.proto @@ -49,10 +49,11 @@ message Request { // only write and delete requests get sequenceNumbers assigned. These are used to // ensure that the receiving server is up to date optional uint64 sequence_number = 5; + optional uint32 shard_id = 6; // the originzatingServerId is only used for writes and deletes. It is the id of the // server that first committed the write to its local datastore. It is used for // the other servers in the hash ring to ensure they remain consistent. - optional uint32 originating_server_id = 6; + optional uint32 originating_server_id = 21; optional uint32 cluster_version = 10; optional string query = 7; optional string user_name = 8;