Implement simple remote node choice policy

pull/3320/head
Philip O'Toole 2015-07-15 19:53:10 -07:00
parent f41d2bab5d
commit e254245f2f
3 changed files with 21 additions and 10 deletions

View File

@ -1,6 +1,7 @@
package cluster
import (
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/tsdb"
)
@ -17,7 +18,7 @@ type ShardMapper struct {
}
RemoteMapper interface {
CreateMapper(nodeID uint64, shardID, query string, chunkSize int)
CreateMapper(nodeID, shardID uint64, query string, chunkSize int) (tsdb.Mapper, error)
}
}
@ -25,11 +26,21 @@ func NewShardMapper() *ShardMapper {
return &ShardMapper{}
}
func (r *ShardMapper) CreateMapper(shardID uint64, stmt string, chunkSize int) (tsdb.Mapper, error) {
// Shard is local for now.
m, err := r.TSDBStore.CreateMapper(shardID, stmt, chunkSize)
if err != nil {
return nil, err
func (r *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt string, chunkSize int) (tsdb.Mapper, error) {
var err error
var m tsdb.Mapper
if sh.OwnedBy(r.MetaStore.NodeID()) {
m, err = r.TSDBStore.CreateMapper(sh.ID, stmt, chunkSize)
if err != nil {
return nil, err
}
} else {
// Pick first available node for now. This will be replaced by balancing.
m, err = r.RemoteMapper.CreateMapper(sh.OwnerIDs[0], sh.ID, stmt, chunkSize)
if err != nil {
return nil, err
}
}
return m, nil
}

View File

@ -38,7 +38,7 @@ type QueryExecutor struct {
// Maps shards for queries.
ShardMapper interface {
CreateMapper(shardID uint64, stmt string, chunkSize int) (Mapper, error)
CreateMapper(shard meta.ShardInfo, stmt string, chunkSize int) (Mapper, error)
}
Logger *log.Logger
@ -240,7 +240,7 @@ func (q *QueryExecutor) plan(stmt *influxql.SelectStatement, chunkSize int) (Exe
// Build the Mappers, one per shard.
mappers := []Mapper{}
for _, sh := range shards {
m, err := q.ShardMapper.CreateMapper(sh.ID, stmt.String(), chunkSize)
m, err := q.ShardMapper.CreateMapper(sh, stmt.String(), chunkSize)
if err != nil {
return nil, err
}

View File

@ -470,8 +470,8 @@ type testShardMapper struct {
store *Store
}
func (t *testShardMapper) CreateMapper(shardID uint64, stmt string, chunkSize int) (Mapper, error) {
m, err := t.store.CreateMapper(shardID, stmt, chunkSize)
func (t *testShardMapper) CreateMapper(shard meta.ShardInfo, stmt string, chunkSize int) (Mapper, error) {
m, err := t.store.CreateMapper(shard.ID, stmt, chunkSize)
return m, err
}