Wire up logic to get shards for a query.

pull/249/head
Paul Dix 2014-02-16 12:28:00 -05:00
parent bf139380cb
commit 3465de85d5
4 changed files with 72 additions and 8 deletions

View File

@ -27,6 +27,7 @@ type QuerySpec interface {
TableNames() []string
GetGroupByInterval() *time.Duration
IsRegex() bool
ShouldQueryShortTermAndLongTerm() (shouldQueryShortTerm bool, shouldQueryLongTerm bool)
}
type WAL interface {
@ -705,7 +706,7 @@ func (self *ClusterConfiguration) GetMapForJsonSerialization() map[string]interf
return jsonObject
}
func (self *ClusterConfiguration) GetShardToWriteToBySeriesAndTime(db, series string, microsecondsEpoch int64) (Shard, error) {
func (self *ClusterConfiguration) GetShardToWriteToBySeriesAndTime(db, series string, microsecondsEpoch int64) (*ShardData, error) {
shards := self.shortTermShards
// split := self.config.ShortTermShard.Split
hasRandomSplit := self.config.ShortTermShard.HasRandomSplit()
@ -782,7 +783,9 @@ func (self *ClusterConfiguration) createShards(microsecondsEpoch int64, shardTyp
if startIndex >= len(self.servers) {
startIndex = 0
}
serverIds = append(serverIds, self.servers[startIndex].Id())
server := self.servers[startIndex]
self.lastServerToGetShard = server
serverIds = append(serverIds, server.Id())
startIndex += 1
}
shards = append(shards, &NewShardData{StartTime: *startTime, EndTime: *endTime, ServerIds: serverIds, Type: shardType})
@ -810,11 +813,55 @@ func (self *ClusterConfiguration) getStartAndEndBasedOnDuration(microsecondsEpoc
return &startTime, &endTime
}
func (self *ClusterConfiguration) GetShards(querySpec QuerySpec) []Shard {
shard := NewShard(self.LocalServerId, time.Now(), time.Now(), LONG_TERM, self.wal)
shard.SetServers([]*ClusterServer{})
shard.SetLocalStore(self.shardStore, self.LocalServerId)
return []Shard{shard}
func (self *ClusterConfiguration) GetShards(querySpec QuerySpec) []*ShardData {
shouldQueryShortTerm, shouldQueryLongTerm := querySpec.ShouldQueryShortTermAndLongTerm()
if shouldQueryLongTerm && shouldQueryShortTerm {
fmt.Println("GetShards: long term and short term")
shards := make([]*ShardData, 0)
shards = append(shards, self.getShardRange(querySpec, self.shortTermShards)...)
shards = append(shards, self.getShardRange(querySpec, self.longTermShards)...)
SortShardsByTimeDescending(shards)
return shards
} else if shouldQueryLongTerm {
fmt.Println("GetShards: long term")
return self.getShardRange(querySpec, self.longTermShards)
}
fmt.Println("GetShards: short term")
return self.getShardRange(querySpec, self.shortTermShards)
}
func (self *ClusterConfiguration) getShardRange(querySpec QuerySpec, shards []*ShardData) []*ShardData {
fmt.Println("---------------------- getShardRange")
startTime := querySpec.GetStartTime().UnixNano() / 1000
endTime := querySpec.GetEndTime().UnixNano() / 1000
fmt.Println("StartTime, EndTime: ", startTime, endTime)
startIndex := -1
endIndex := -1
// this logic looks a little weird because the shards passed into this function should
// always be passed in time descending order. But start time is low and end time is high. just FYI.
for i, shard := range shards {
fmt.Println("shard: ", shard)
if startIndex == -1 {
if shard.IsMicrosecondInRange(endTime) {
startIndex = i
continue
}
} else if shard.IsMicrosecondInRange(startTime) {
endIndex = i
break
}
}
fmt.Println("StartIndex, EndIndex: ", startIndex, endIndex)
fmt.Println("END ---------------------- getShardRange")
if startIndex == -1 {
return []*ShardData{}
}
if endIndex == -1 {
endIndex = len(shards)
}
return shards[startIndex:endIndex]
}
func (self *ClusterConfiguration) HashDbAndSeriesToInt(database, series string) int {

View File

@ -154,6 +154,11 @@ func (self *CoordinatorImpl) RunQuery(user common.User, database string, querySt
func (self *CoordinatorImpl) runQuery(query *parser.Query, user common.User, database string, yield func(*protocol.Series) error) error {
querySpec := parser.NewQuerySpec(user, database, query)
shards := self.clusterConfiguration.GetShards(querySpec)
fmt.Println("COORD: runQuery shards ")
for _, s := range shards {
fmt.Println("shard: ", s)
}
fmt.Println("**************************")
responses := make([]chan *protocol.Response, len(shards), len(shards))
for i, shard := range shards {
responseChan := make(chan *protocol.Response, 1)

View File

@ -259,7 +259,7 @@ func (self *ServerSuite) TestQueryAgainstMultipleShards(c *C) {
t := (time.Now().Unix() - 3600) * 1000
data = fmt.Sprintf(`[{"points": [[2, %d]], "name": "test_query_against_multiple_shards", "columns": ["value", "time"]}]`, t)
self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c)
time.Sleep(time.Second * 2)
time.Sleep(time.Second)
collection := self.serverProcesses[0].Query("test_rep", "select count(value) from test_query_against_multiple_shards group by time(1h)", false, c)
c.Assert(collection.Members, HasLen, 1)
series := collection.GetSeries("test_query_against_multiple_shards", c)

View File

@ -107,3 +107,15 @@ func (self *QuerySpec) IsSinglePointQuery() bool {
func (self *QuerySpec) SelectQuery() *SelectQuery {
return self.query.SelectQuery
}
func (self *QuerySpec) ShouldQueryShortTermAndLongTerm() (shouldQueryShortTerm bool, shouldQueryLongTerm bool) {
for val, _ := range self.SeriesValuesAndColumns() {
firstChar := val.Name[0]
if firstChar < 97 {
shouldQueryLongTerm = true
} else {
shouldQueryShortTerm = true
}
}
return
}