Rename cluster package to coordinator
parent
de04d0972d
commit
6cc1a34704
|
@ -14,7 +14,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
"github.com/influxdata/influxdb/cluster"
|
||||
"github.com/influxdata/influxdb/coordinator"
|
||||
"github.com/influxdata/influxdb/monitor"
|
||||
"github.com/influxdata/influxdb/services/admin"
|
||||
"github.com/influxdata/influxdb/services/collectd"
|
||||
|
@ -41,11 +41,11 @@ const (
|
|||
|
||||
// Config represents the configuration format for the influxd binary.
|
||||
type Config struct {
|
||||
Meta *meta.Config `toml:"meta"`
|
||||
Data tsdb.Config `toml:"data"`
|
||||
Cluster cluster.Config `toml:"cluster"`
|
||||
Retention retention.Config `toml:"retention"`
|
||||
Precreator precreator.Config `toml:"shard-precreation"`
|
||||
Meta *meta.Config `toml:"meta"`
|
||||
Data tsdb.Config `toml:"data"`
|
||||
Coordinator coordinator.Config `toml:"coordinator"`
|
||||
Retention retention.Config `toml:"retention"`
|
||||
Precreator precreator.Config `toml:"shard-precreation"`
|
||||
|
||||
Admin admin.Config `toml:"admin"`
|
||||
Monitor monitor.Config `toml:"monitor"`
|
||||
|
@ -76,7 +76,7 @@ func NewConfig() *Config {
|
|||
c := &Config{}
|
||||
c.Meta = meta.NewConfig()
|
||||
c.Data = tsdb.NewConfig()
|
||||
c.Cluster = cluster.NewConfig()
|
||||
c.Coordinator = coordinator.NewConfig()
|
||||
c.Precreator = precreator.NewConfig()
|
||||
|
||||
c.Admin = admin.NewConfig()
|
||||
|
|
|
@ -12,7 +12,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/cluster"
|
||||
"github.com/influxdata/influxdb/coordinator"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/monitor"
|
||||
|
@ -58,7 +58,7 @@ type Server struct {
|
|||
|
||||
TSDBStore *tsdb.Store
|
||||
QueryExecutor *influxql.QueryExecutor
|
||||
PointsWriter *cluster.PointsWriter
|
||||
PointsWriter *coordinator.PointsWriter
|
||||
Subscriber *subscriber.Service
|
||||
|
||||
Services []Service
|
||||
|
@ -164,25 +164,25 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
|
|||
s.Subscriber = subscriber.NewService(c.Subscriber)
|
||||
|
||||
// Initialize points writer.
|
||||
s.PointsWriter = cluster.NewPointsWriter()
|
||||
s.PointsWriter.WriteTimeout = time.Duration(c.Cluster.WriteTimeout)
|
||||
s.PointsWriter = coordinator.NewPointsWriter()
|
||||
s.PointsWriter.WriteTimeout = time.Duration(c.Coordinator.WriteTimeout)
|
||||
s.PointsWriter.TSDBStore = s.TSDBStore
|
||||
s.PointsWriter.Subscriber = s.Subscriber
|
||||
|
||||
// Initialize query executor.
|
||||
s.QueryExecutor = influxql.NewQueryExecutor()
|
||||
s.QueryExecutor.StatementExecutor = &cluster.StatementExecutor{
|
||||
s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{
|
||||
MetaClient: s.MetaClient,
|
||||
TSDBStore: cluster.LocalTSDBStore{Store: s.TSDBStore},
|
||||
TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore},
|
||||
Monitor: s.Monitor,
|
||||
PointsWriter: s.PointsWriter,
|
||||
MaxSelectPointN: c.Cluster.MaxSelectPointN,
|
||||
MaxSelectSeriesN: c.Cluster.MaxSelectSeriesN,
|
||||
MaxSelectBucketsN: c.Cluster.MaxSelectBucketsN,
|
||||
MaxSelectPointN: c.Coordinator.MaxSelectPointN,
|
||||
MaxSelectSeriesN: c.Coordinator.MaxSelectSeriesN,
|
||||
MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN,
|
||||
}
|
||||
s.QueryExecutor.QueryTimeout = time.Duration(c.Cluster.QueryTimeout)
|
||||
s.QueryExecutor.LogQueriesAfter = time.Duration(c.Cluster.LogQueriesAfter)
|
||||
s.QueryExecutor.MaxConcurrentQueries = c.Cluster.MaxConcurrentQueries
|
||||
s.QueryExecutor.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout)
|
||||
s.QueryExecutor.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter)
|
||||
s.QueryExecutor.MaxConcurrentQueries = c.Coordinator.MaxConcurrentQueries
|
||||
if c.Data.QueryLogEnabled {
|
||||
s.QueryExecutor.Logger = log.New(os.Stderr, "[query] ", log.LstdFlags)
|
||||
}
|
||||
|
@ -490,12 +490,12 @@ type tcpaddr struct{ host string }
|
|||
func (a *tcpaddr) Network() string { return "tcp" }
|
||||
func (a *tcpaddr) String() string { return a.host }
|
||||
|
||||
// monitorPointsWriter is a wrapper around `cluster.PointsWriter` that helps
|
||||
// monitorPointsWriter is a wrapper around `coordinator.PointsWriter` that helps
|
||||
// to prevent a circular dependency between the `cluster` and `monitor` packages.
|
||||
type monitorPointsWriter cluster.PointsWriter
|
||||
type monitorPointsWriter coordinator.PointsWriter
|
||||
|
||||
func (pw *monitorPointsWriter) WritePoints(database, retentionPolicy string, points models.Points) error {
|
||||
return (*cluster.PointsWriter)(pw).WritePoints(database, retentionPolicy, models.ConsistencyLevelAny, points)
|
||||
return (*coordinator.PointsWriter)(pw).WritePoints(database, retentionPolicy, models.ConsistencyLevelAny, points)
|
||||
}
|
||||
|
||||
func (s *Server) remoteAddr(addr string) string {
|
||||
|
|
|
@ -230,8 +230,8 @@ func NewConfig() *run.Config {
|
|||
c := run.NewConfig()
|
||||
c.BindAddress = "127.0.0.1:0"
|
||||
c.ReportingDisabled = true
|
||||
c.Cluster.ShardWriterTimeout = toml.Duration(30 * time.Second)
|
||||
c.Cluster.WriteTimeout = toml.Duration(30 * time.Second)
|
||||
c.Coordinator.ShardWriterTimeout = toml.Duration(30 * time.Second)
|
||||
c.Coordinator.WriteTimeout = toml.Duration(30 * time.Second)
|
||||
c.Meta.Dir = MustTempFile()
|
||||
|
||||
if !testing.Verbose() {
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/cluster"
|
||||
"github.com/influxdata/influxdb/coordinator"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
)
|
||||
|
||||
|
@ -6097,7 +6097,7 @@ func TestServer_ConcurrentPointsWriter_Subscriber(t *testing.T) {
|
|||
case <-done:
|
||||
return
|
||||
default:
|
||||
wpr := &cluster.WritePointsRequest{
|
||||
wpr := &coordinator.WritePointsRequest{
|
||||
Database: "db0",
|
||||
RetentionPolicy: "rp0",
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package cluster
|
||||
package coordinator
|
||||
|
||||
import (
|
||||
"time"
|
|
@ -1,16 +1,16 @@
|
|||
package cluster_test
|
||||
package coordinator_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
"github.com/influxdata/influxdb/cluster"
|
||||
"github.com/influxdata/influxdb/coordinator"
|
||||
)
|
||||
|
||||
func TestConfig_Parse(t *testing.T) {
|
||||
// Parse configuration.
|
||||
var c cluster.Config
|
||||
var c coordinator.Config
|
||||
if _, err := toml.Decode(`
|
||||
shard-writer-timeout = "10s"
|
||||
write-timeout = "20s"
|
|
@ -1,4 +1,4 @@
|
|||
package cluster
|
||||
package coordinator
|
||||
|
||||
import (
|
||||
"time"
|
|
@ -1,4 +1,4 @@
|
|||
package cluster_test
|
||||
package coordinator_test
|
||||
|
||||
import (
|
||||
"time"
|
|
@ -1,4 +1,4 @@
|
|||
package cluster
|
||||
package coordinator
|
||||
|
||||
import (
|
||||
"errors"
|
|
@ -1,4 +1,4 @@
|
|||
package cluster_test
|
||||
package coordinator_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
@ -9,7 +9,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/cluster"
|
||||
"github.com/influxdata/influxdb/coordinator"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
)
|
||||
|
@ -30,15 +30,15 @@ func TestPointsWriter_MapShards_One(t *testing.T) {
|
|||
return &rp.ShardGroups[0], nil
|
||||
}
|
||||
|
||||
c := cluster.PointsWriter{MetaClient: ms}
|
||||
pr := &cluster.WritePointsRequest{
|
||||
c := coordinator.PointsWriter{MetaClient: ms}
|
||||
pr := &coordinator.WritePointsRequest{
|
||||
Database: "mydb",
|
||||
RetentionPolicy: "myrp",
|
||||
}
|
||||
pr.AddPoint("cpu", 1.0, time.Now(), nil)
|
||||
|
||||
var (
|
||||
shardMappings *cluster.ShardMapping
|
||||
shardMappings *coordinator.ShardMapping
|
||||
err error
|
||||
)
|
||||
if shardMappings, err = c.MapShards(pr); err != nil {
|
||||
|
@ -79,8 +79,8 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) {
|
|||
panic("should not get here")
|
||||
}
|
||||
|
||||
c := cluster.PointsWriter{MetaClient: ms}
|
||||
pr := &cluster.WritePointsRequest{
|
||||
c := coordinator.PointsWriter{MetaClient: ms}
|
||||
pr := &coordinator.WritePointsRequest{
|
||||
Database: "mydb",
|
||||
RetentionPolicy: "myrp",
|
||||
}
|
||||
|
@ -92,7 +92,7 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) {
|
|||
pr.AddPoint("cpu", 3.0, time.Unix(0, 0).Add(time.Hour+time.Second), nil)
|
||||
|
||||
var (
|
||||
shardMappings *cluster.ShardMapping
|
||||
shardMappings *coordinator.ShardMapping
|
||||
err error
|
||||
)
|
||||
if shardMappings, err = c.MapShards(pr); err != nil {
|
||||
|
@ -150,7 +150,7 @@ func TestPointsWriter_WritePoints(t *testing.T) {
|
|||
|
||||
for _, test := range tests {
|
||||
|
||||
pr := &cluster.WritePointsRequest{
|
||||
pr := &coordinator.WritePointsRequest{
|
||||
Database: test.database,
|
||||
RetentionPolicy: test.retentionPolicy,
|
||||
}
|
||||
|
@ -163,7 +163,7 @@ func TestPointsWriter_WritePoints(t *testing.T) {
|
|||
|
||||
// copy to prevent data race
|
||||
theTest := test
|
||||
sm := cluster.NewShardMapping()
|
||||
sm := coordinator.NewShardMapping()
|
||||
sm.MapPoint(
|
||||
&meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{
|
||||
{NodeID: 1},
|
||||
|
@ -186,7 +186,7 @@ func TestPointsWriter_WritePoints(t *testing.T) {
|
|||
}},
|
||||
pr.Points[2])
|
||||
|
||||
// Local cluster.Node ShardWriter
|
||||
// Local coordinator.Node ShardWriter
|
||||
// lock on the write increment since these functions get called in parallel
|
||||
var mu sync.Mutex
|
||||
sw := &fakeShardWriter{
|
||||
|
@ -217,13 +217,13 @@ func TestPointsWriter_WritePoints(t *testing.T) {
|
|||
}
|
||||
ms.NodeIDFn = func() uint64 { return 1 }
|
||||
|
||||
subPoints := make(chan *cluster.WritePointsRequest, 1)
|
||||
subPoints := make(chan *coordinator.WritePointsRequest, 1)
|
||||
sub := Subscriber{}
|
||||
sub.PointsFn = func() chan<- *cluster.WritePointsRequest {
|
||||
sub.PointsFn = func() chan<- *coordinator.WritePointsRequest {
|
||||
return subPoints
|
||||
}
|
||||
|
||||
c := cluster.NewPointsWriter()
|
||||
c := coordinator.NewPointsWriter()
|
||||
c.MetaClient = ms
|
||||
c.ShardWriter = sw
|
||||
c.TSDBStore = store
|
||||
|
@ -337,10 +337,10 @@ func (m PointsWriterMetaClient) ShardOwner(shardID uint64) (string, string, *met
|
|||
}
|
||||
|
||||
type Subscriber struct {
|
||||
PointsFn func() chan<- *cluster.WritePointsRequest
|
||||
PointsFn func() chan<- *coordinator.WritePointsRequest
|
||||
}
|
||||
|
||||
func (s Subscriber) Points() chan<- *cluster.WritePointsRequest {
|
||||
func (s Subscriber) Points() chan<- *coordinator.WritePointsRequest {
|
||||
return s.PointsFn()
|
||||
}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package cluster
|
||||
package coordinator
|
||||
|
||||
import (
|
||||
"bytes"
|
|
@ -1,4 +1,4 @@
|
|||
package cluster_test
|
||||
package coordinator_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
@ -11,7 +11,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/influxdata/influxdb/cluster"
|
||||
"github.com/influxdata/influxdb/coordinator"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
|
@ -159,13 +159,13 @@ func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// QueryExecutor is a test wrapper for cluster.QueryExecutor.
|
||||
// QueryExecutor is a test wrapper for coordinator.QueryExecutor.
|
||||
type QueryExecutor struct {
|
||||
*influxql.QueryExecutor
|
||||
|
||||
MetaClient MetaClient
|
||||
TSDBStore TSDBStore
|
||||
StatementExecutor *cluster.StatementExecutor
|
||||
StatementExecutor *coordinator.StatementExecutor
|
||||
LogOutput bytes.Buffer
|
||||
}
|
||||
|
||||
|
@ -175,7 +175,7 @@ func NewQueryExecutor() *QueryExecutor {
|
|||
e := &QueryExecutor{
|
||||
QueryExecutor: influxql.NewQueryExecutor(),
|
||||
}
|
||||
e.StatementExecutor = &cluster.StatementExecutor{
|
||||
e.StatementExecutor = &coordinator.StatementExecutor{
|
||||
MetaClient: &e.MetaClient,
|
||||
TSDBStore: &e.TSDBStore,
|
||||
}
|
||||
|
@ -202,7 +202,7 @@ func (e *QueryExecutor) ExecuteQuery(query, database string, chunkSize int) <-ch
|
|||
return e.QueryExecutor.ExecuteQuery(MustParseQuery(query), database, chunkSize, false, make(chan struct{}))
|
||||
}
|
||||
|
||||
// TSDBStore is a mockable implementation of cluster.TSDBStore.
|
||||
// TSDBStore is a mockable implementation of coordinator.TSDBStore.
|
||||
type TSDBStore struct {
|
||||
CreateShardFn func(database, policy string, shardID uint64) error
|
||||
WriteToShardFn func(shardID uint64, points []models.Point) error
|
|
@ -9,7 +9,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/cluster"
|
||||
"github.com/influxdata/influxdb/coordinator"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
|
@ -395,7 +395,7 @@ func (ms *MetaClient) AcquireLease(name string) (l *meta.Lease, err error) {
|
|||
return nil, meta.ErrServiceUnavailable
|
||||
}
|
||||
|
||||
// Databases returns a list of database info about each database in the cluster.
|
||||
// Databases returns a list of database info about each database in the coordinator.
|
||||
func (ms *MetaClient) Databases() []meta.DatabaseInfo {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
|
@ -506,7 +506,7 @@ func NewQueryExecutor(t *testing.T) *QueryExecutor {
|
|||
|
||||
// PointsWriter is a mock points writer.
|
||||
type PointsWriter struct {
|
||||
WritePointsFn func(p *cluster.WritePointsRequest) error
|
||||
WritePointsFn func(p *coordinator.WritePointsRequest) error
|
||||
Err error
|
||||
PointsPerSecond int
|
||||
t *testing.T
|
||||
|
@ -521,7 +521,7 @@ func NewPointsWriter(t *testing.T) *PointsWriter {
|
|||
}
|
||||
|
||||
// WritePoints mocks writing points.
|
||||
func (pw *PointsWriter) WritePoints(p *cluster.WritePointsRequest) error {
|
||||
func (pw *PointsWriter) WritePoints(p *coordinator.WritePointsRequest) error {
|
||||
// If the test set a callback, call it.
|
||||
if pw.WritePointsFn != nil {
|
||||
if err := pw.WritePointsFn(p); err != nil {
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/cluster"
|
||||
"github.com/influxdata/influxdb/coordinator"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
)
|
||||
|
||||
|
@ -24,7 +24,7 @@ const (
|
|||
// PointsWriter is an interface for writing points to a subscription destination.
|
||||
// Only WritePoints() needs to be satisfied.
|
||||
type PointsWriter interface {
|
||||
WritePoints(p *cluster.WritePointsRequest) error
|
||||
WritePoints(p *coordinator.WritePointsRequest) error
|
||||
}
|
||||
|
||||
// unique set that identifies a given subscription
|
||||
|
@ -46,7 +46,7 @@ type Service struct {
|
|||
NewPointsWriter func(u url.URL) (PointsWriter, error)
|
||||
Logger *log.Logger
|
||||
statMap *expvar.Map
|
||||
points chan *cluster.WritePointsRequest
|
||||
points chan *coordinator.WritePointsRequest
|
||||
wg sync.WaitGroup
|
||||
closed bool
|
||||
closing chan struct{}
|
||||
|
@ -60,7 +60,7 @@ func NewService(c Config) *Service {
|
|||
NewPointsWriter: newPointsWriter,
|
||||
Logger: log.New(os.Stderr, "[subscriber] ", log.LstdFlags),
|
||||
statMap: influxdb.NewStatistics("subscriber", "subscriber", nil),
|
||||
points: make(chan *cluster.WritePointsRequest),
|
||||
points: make(chan *coordinator.WritePointsRequest),
|
||||
closed: true,
|
||||
closing: make(chan struct{}),
|
||||
}
|
||||
|
@ -214,7 +214,7 @@ func (s *Service) createSubscription(se subEntry, mode string, destinations []st
|
|||
}
|
||||
|
||||
// Points returns a channel into which write point requests can be sent.
|
||||
func (s *Service) Points() chan<- *cluster.WritePointsRequest {
|
||||
func (s *Service) Points() chan<- *coordinator.WritePointsRequest {
|
||||
return s.points
|
||||
}
|
||||
|
||||
|
@ -253,7 +253,7 @@ type balancewriter struct {
|
|||
i int
|
||||
}
|
||||
|
||||
func (b *balancewriter) WritePoints(p *cluster.WritePointsRequest) error {
|
||||
func (b *balancewriter) WritePoints(p *coordinator.WritePointsRequest) error {
|
||||
var lastErr error
|
||||
for range b.writers {
|
||||
// round robin through destinations.
|
||||
|
|
|
@ -5,7 +5,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/cluster"
|
||||
"github.com/influxdata/influxdb/coordinator"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
"github.com/influxdata/influxdb/services/subscriber"
|
||||
)
|
||||
|
@ -24,10 +24,10 @@ func (m MetaClient) WaitForDataChanged() chan struct{} {
|
|||
}
|
||||
|
||||
type Subscription struct {
|
||||
WritePointsFn func(*cluster.WritePointsRequest) error
|
||||
WritePointsFn func(*coordinator.WritePointsRequest) error
|
||||
}
|
||||
|
||||
func (s Subscription) WritePoints(p *cluster.WritePointsRequest) error {
|
||||
func (s Subscription) WritePoints(p *coordinator.WritePointsRequest) error {
|
||||
return s.WritePointsFn(p)
|
||||
}
|
||||
|
||||
|
@ -53,11 +53,11 @@ func TestService_IgnoreNonMatch(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
prs := make(chan *cluster.WritePointsRequest, 2)
|
||||
prs := make(chan *coordinator.WritePointsRequest, 2)
|
||||
urls := make(chan url.URL, 2)
|
||||
newPointsWriter := func(u url.URL) (subscriber.PointsWriter, error) {
|
||||
sub := Subscription{}
|
||||
sub.WritePointsFn = func(p *cluster.WritePointsRequest) error {
|
||||
sub.WritePointsFn = func(p *coordinator.WritePointsRequest) error {
|
||||
prs <- p
|
||||
return nil
|
||||
}
|
||||
|
@ -88,11 +88,11 @@ func TestService_IgnoreNonMatch(t *testing.T) {
|
|||
}
|
||||
|
||||
// Write points that don't match any subscription.
|
||||
s.Points() <- &cluster.WritePointsRequest{
|
||||
s.Points() <- &coordinator.WritePointsRequest{
|
||||
Database: "db1",
|
||||
RetentionPolicy: "rp0",
|
||||
}
|
||||
s.Points() <- &cluster.WritePointsRequest{
|
||||
s.Points() <- &coordinator.WritePointsRequest{
|
||||
Database: "db0",
|
||||
RetentionPolicy: "rp2",
|
||||
}
|
||||
|
@ -128,11 +128,11 @@ func TestService_ModeALL(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
prs := make(chan *cluster.WritePointsRequest, 2)
|
||||
prs := make(chan *coordinator.WritePointsRequest, 2)
|
||||
urls := make(chan url.URL, 2)
|
||||
newPointsWriter := func(u url.URL) (subscriber.PointsWriter, error) {
|
||||
sub := Subscription{}
|
||||
sub.WritePointsFn = func(p *cluster.WritePointsRequest) error {
|
||||
sub.WritePointsFn = func(p *coordinator.WritePointsRequest) error {
|
||||
prs <- p
|
||||
return nil
|
||||
}
|
||||
|
@ -163,7 +163,7 @@ func TestService_ModeALL(t *testing.T) {
|
|||
}
|
||||
|
||||
// Write points that match subscription with mode ALL
|
||||
expPR := &cluster.WritePointsRequest{
|
||||
expPR := &coordinator.WritePointsRequest{
|
||||
Database: "db0",
|
||||
RetentionPolicy: "rp0",
|
||||
}
|
||||
|
@ -171,7 +171,7 @@ func TestService_ModeALL(t *testing.T) {
|
|||
|
||||
// Should get pr back twice
|
||||
for i := 0; i < 2; i++ {
|
||||
var pr *cluster.WritePointsRequest
|
||||
var pr *coordinator.WritePointsRequest
|
||||
select {
|
||||
case pr = <-prs:
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
|
@ -206,11 +206,11 @@ func TestService_ModeANY(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
prs := make(chan *cluster.WritePointsRequest, 2)
|
||||
prs := make(chan *coordinator.WritePointsRequest, 2)
|
||||
urls := make(chan url.URL, 2)
|
||||
newPointsWriter := func(u url.URL) (subscriber.PointsWriter, error) {
|
||||
sub := Subscription{}
|
||||
sub.WritePointsFn = func(p *cluster.WritePointsRequest) error {
|
||||
sub.WritePointsFn = func(p *coordinator.WritePointsRequest) error {
|
||||
prs <- p
|
||||
return nil
|
||||
}
|
||||
|
@ -240,14 +240,14 @@ func TestService_ModeANY(t *testing.T) {
|
|||
}
|
||||
}
|
||||
// Write points that match subscription with mode ANY
|
||||
expPR := &cluster.WritePointsRequest{
|
||||
expPR := &coordinator.WritePointsRequest{
|
||||
Database: "db0",
|
||||
RetentionPolicy: "rp0",
|
||||
}
|
||||
s.Points() <- expPR
|
||||
|
||||
// Validate we get the pr back just once
|
||||
var pr *cluster.WritePointsRequest
|
||||
var pr *coordinator.WritePointsRequest
|
||||
select {
|
||||
case pr = <-prs:
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
|
@ -294,11 +294,11 @@ func TestService_Multiple(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
prs := make(chan *cluster.WritePointsRequest, 4)
|
||||
prs := make(chan *coordinator.WritePointsRequest, 4)
|
||||
urls := make(chan url.URL, 4)
|
||||
newPointsWriter := func(u url.URL) (subscriber.PointsWriter, error) {
|
||||
sub := Subscription{}
|
||||
sub.WritePointsFn = func(p *cluster.WritePointsRequest) error {
|
||||
sub.WritePointsFn = func(p *coordinator.WritePointsRequest) error {
|
||||
prs <- p
|
||||
return nil
|
||||
}
|
||||
|
@ -329,24 +329,24 @@ func TestService_Multiple(t *testing.T) {
|
|||
}
|
||||
|
||||
// Write points that don't match any subscription.
|
||||
s.Points() <- &cluster.WritePointsRequest{
|
||||
s.Points() <- &coordinator.WritePointsRequest{
|
||||
Database: "db1",
|
||||
RetentionPolicy: "rp0",
|
||||
}
|
||||
s.Points() <- &cluster.WritePointsRequest{
|
||||
s.Points() <- &coordinator.WritePointsRequest{
|
||||
Database: "db0",
|
||||
RetentionPolicy: "rp2",
|
||||
}
|
||||
|
||||
// Write points that match subscription with mode ANY
|
||||
expPR := &cluster.WritePointsRequest{
|
||||
expPR := &coordinator.WritePointsRequest{
|
||||
Database: "db0",
|
||||
RetentionPolicy: "rp0",
|
||||
}
|
||||
s.Points() <- expPR
|
||||
|
||||
// Validate we get the pr back just once
|
||||
var pr *cluster.WritePointsRequest
|
||||
var pr *coordinator.WritePointsRequest
|
||||
select {
|
||||
case pr = <-prs:
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
|
@ -364,7 +364,7 @@ func TestService_Multiple(t *testing.T) {
|
|||
}
|
||||
|
||||
// Write points that match subscription with mode ALL
|
||||
expPR = &cluster.WritePointsRequest{
|
||||
expPR = &coordinator.WritePointsRequest{
|
||||
Database: "db0",
|
||||
RetentionPolicy: "rp1",
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ package subscriber
|
|||
import (
|
||||
"net"
|
||||
|
||||
"github.com/influxdata/influxdb/cluster"
|
||||
"github.com/influxdata/influxdb/coordinator"
|
||||
)
|
||||
|
||||
// UDP supports writing points over UDP using the line protocol.
|
||||
|
@ -17,7 +17,7 @@ func NewUDP(addr string) *UDP {
|
|||
}
|
||||
|
||||
// WritePoints writes points over UDP transport.
|
||||
func (u *UDP) WritePoints(p *cluster.WritePointsRequest) (err error) {
|
||||
func (u *UDP) WritePoints(p *coordinator.WritePointsRequest) (err error) {
|
||||
var addr *net.UDPAddr
|
||||
var con *net.UDPConn
|
||||
addr, err = net.ResolveUDPAddr("udp", u.addr)
|
||||
|
|
|
@ -33,7 +33,7 @@ func TestSize_UnmarshalText_GB(t *testing.T) {
|
|||
|
||||
func TestConfig_Encode(t *testing.T) {
|
||||
var c run.Config
|
||||
c.Cluster.WriteTimeout = itoml.Duration(time.Minute)
|
||||
c.Coordinator.WriteTimeout = itoml.Duration(time.Minute)
|
||||
buf := new(bytes.Buffer)
|
||||
if err := toml.NewEncoder(buf).Encode(&c); err != nil {
|
||||
t.Fatal("Failed to encode: ", err)
|
||||
|
|
Loading…
Reference in New Issue