influxdb/cluster/service_test.go

94 lines
1.8 KiB
Go
Raw Normal View History

2015-05-30 20:00:46 +00:00
package cluster_test
import (
"fmt"
2015-06-05 22:54:12 +00:00
"net"
2015-05-30 20:00:46 +00:00
"time"
2015-06-05 22:54:12 +00:00
"github.com/influxdb/influxdb/cluster"
2015-05-30 20:00:46 +00:00
"github.com/influxdb/influxdb/meta"
2015-06-05 22:54:12 +00:00
"github.com/influxdb/influxdb/tcp"
2015-05-30 20:00:46 +00:00
"github.com/influxdb/influxdb/tsdb"
)
type metaStore struct {
host string
}
func (m *metaStore) Node(nodeID uint64) (*meta.NodeInfo, error) {
return &meta.NodeInfo{
ID: nodeID,
Host: m.host,
}, nil
}
type testService struct {
nodeID uint64
writeShardFunc func(shardID uint64, points []tsdb.Point) error
2015-06-05 22:54:12 +00:00
ln net.Listener
muxln net.Listener
2015-05-30 20:00:46 +00:00
}
func newTestService(f func(shardID uint64, points []tsdb.Point) error) testService {
2015-06-05 22:54:12 +00:00
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(err)
}
mux := tcp.NewMux()
muxln := mux.Listen(cluster.MuxHeader)
go mux.Serve(ln)
2015-05-30 20:00:46 +00:00
return testService{
writeShardFunc: f,
2015-06-05 22:54:12 +00:00
ln: ln,
muxln: muxln,
}
}
func (ts *testService) Close() {
if ts.ln != nil {
ts.ln.Close()
2015-05-30 20:00:46 +00:00
}
}
type serviceResponses []serviceResponse
type serviceResponse struct {
shardID uint64
ownerID uint64
points []tsdb.Point
}
func (t testService) WriteToShard(shardID uint64, points []tsdb.Point) error {
return t.writeShardFunc(shardID, points)
2015-05-30 20:00:46 +00:00
}
func writeShardSuccess(shardID uint64, points []tsdb.Point) error {
2015-05-30 20:00:46 +00:00
responses <- &serviceResponse{
shardID: shardID,
points: points,
}
return nil
}
func writeShardFail(shardID uint64, points []tsdb.Point) error {
2015-05-30 20:00:46 +00:00
return fmt.Errorf("failed to write")
}
var responses = make(chan *serviceResponse, 1024)
func (testService) ResponseN(n int) ([]*serviceResponse, error) {
var a []*serviceResponse
for {
select {
case r := <-responses:
a = append(a, r)
if len(a) == n {
return a, nil
}
case <-time.After(time.Second):
return a, fmt.Errorf("unexpected response count: expected: %d, actual: %d", n, len(a))
}
}
}