Make coordinator a service

pull/2614/head
Jason Wilder 2015-05-19 14:56:52 -06:00
parent 08b53a9748
commit 1b4b463565
1 changed files with 32 additions and 1 deletions

View File

@ -45,6 +45,8 @@ var (
// data nodes.
type Coordinator struct {
mu sync.RWMutex
closing chan struct{}
MetaStore meta.Store
shardWriters []ShardWriter
}
@ -55,6 +57,12 @@ type ShardMapping struct {
Shards map[uint64]meta.ShardInfo // The shards that have been mapped, keyed by shard ID
}
func NewCoordinator() *Coordinator {
return &Coordinator{
closing: make(chan struct{}),
}
}
// NewShardMapping creates an empty ShardMapping
func NewShardMapping() *ShardMapping {
return &ShardMapping{
@ -81,6 +89,25 @@ type ShardWriter interface {
WriteShard(shardID uint64, points []data.Point) (int, error)
}
func (c *Coordinator) Open() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.closing == nil {
c.closing = make(chan struct{})
}
return nil
}
func (c *Coordinator) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.closing != nil {
close(c.closing)
c.closing = nil
}
return nil
}
func (c *Coordinator) AddShardWriter(s ShardWriter) {
c.mu.Lock()
defer c.mu.Unlock()
@ -140,6 +167,8 @@ func (c *Coordinator) Write(p *WritePointsRequest) error {
for range shardMappings.Points {
select {
case <-c.closing:
return ErrWriteFailed
case err := <-ch:
if err != nil {
return err
@ -188,6 +217,8 @@ func (c *Coordinator) writeToShards(shard meta.ShardInfo, consistency Consistenc
timeout := time.After(defaultWriteTimeout)
for range c.shardWriters {
select {
case <-c.closing:
return ErrWriteFailed
case <-timeout:
// return timeout error to caller
return ErrTimeout