From ad2d33f8590d0aa3b0c694689aa25616bcadf3ff Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 13 Oct 2016 19:16:34 +0100 Subject: [PATCH] Ensure input services can be safely opened and closed --- CHANGELOG.md | 1 + internal/meta_client.go | 162 +++++++++++++++++++++ services/collectd/service.go | 50 +++++-- services/collectd/service_test.go | 134 ++++++++++------- services/graphite/service.go | 29 +++- services/graphite/service_test.go | 234 +++++++++++++++++++----------- services/opentsdb/handler.go | 25 +++- services/opentsdb/service.go | 55 +++++-- services/opentsdb/service_test.go | 125 ++++++++++------ services/udp/service.go | 42 +++++- services/udp/service_test.go | 75 ++++++++++ 11 files changed, 716 insertions(+), 216 deletions(-) create mode 100644 internal/meta_client.go create mode 100644 services/udp/service_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index d0fb0dbd90..9e8109fe0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ - [#7470](https://github.com/influxdata/influxdb/pull/7470): Reduce map allocations when computing the TagSet of a measurement. - [#6894](https://github.com/influxdata/influxdb/issues/6894): Support `INFLUX_USERNAME` and `INFLUX_PASSWORD` for setting username/password in the CLI. - [#6896](https://github.com/influxdata/influxdb/issues/6896): Correctly read in input from a non-interactive stream for the CLI. +- [#7463](https://github.com/influxdata/influxdb/pull/7463): Make input plugin services open/close idempotent. ### Bugfixes diff --git a/internal/meta_client.go b/internal/meta_client.go new file mode 100644 index 0000000000..d5844a9c53 --- /dev/null +++ b/internal/meta_client.go @@ -0,0 +1,162 @@ +package internal + +import ( + "time" + + "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/services/meta" +) + +// MetaClientMock is a mockable implementation of meta.MetaClient. +type MetaClientMock struct { + CloseFn func() error + CreateContinuousQueryFn func(database, name, query string) error + CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error) + CreateDatabaseWithRetentionPolicyFn func(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) + CreateRetentionPolicyFn func(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) + CreateShardGroupFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) + CreateSubscriptionFn func(database, rp, name, mode string, destinations []string) error + CreateUserFn func(name, password string, admin bool) (*meta.UserInfo, error) + + DatabaseFn func(name string) *meta.DatabaseInfo + DatabasesFn func() ([]meta.DatabaseInfo, error) + + DataFn func() meta.Data + DeleteShardGroupFn func(database string, policy string, id uint64) error + DropContinuousQueryFn func(database, name string) error + DropDatabaseFn func(name string) error + DropRetentionPolicyFn func(database, name string) error + DropSubscriptionFn func(database, rp, name string) error + DropShardFn func(id uint64) error + DropUserFn func(name string) error + + OpenFn func() error + + RetentionPolicyFn func(database, name string) (rpi *meta.RetentionPolicyInfo, err error) + + SetAdminPrivilegeFn func(username string, admin bool) error + SetDataFn func(*meta.Data) error + SetDefaultRetentionPolicyFn func(database, name string) error + SetPrivilegeFn func(username, database string, p influxql.Privilege) error + ShardsByTimeRangeFn func(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error) + ShardOwnerFn func(shardID uint64) (database, policy string, sgi *meta.ShardGroupInfo) + UpdateRetentionPolicyFn func(database, name string, rpu *meta.RetentionPolicyUpdate) error + UpdateUserFn func(name, password string) error + UserPrivilegeFn func(username, database string) (*influxql.Privilege, error) + UserPrivilegesFn func(username string) (map[string]influxql.Privilege, error) + UsersFn func() []meta.UserInfo +} + +func (c *MetaClientMock) Close() error { + return c.CloseFn() +} + +func (c *MetaClientMock) CreateContinuousQuery(database, name, query string) error { + return c.CreateContinuousQueryFn(database, name, query) +} + +func (c *MetaClientMock) CreateDatabase(name string) (*meta.DatabaseInfo, error) { + return c.CreateDatabaseFn(name) +} + +func (c *MetaClientMock) CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) { + return c.CreateDatabaseWithRetentionPolicyFn(name, spec) +} + +func (c *MetaClientMock) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) { + return c.CreateRetentionPolicyFn(database, spec) +} + +func (c *MetaClientMock) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { + return c.CreateShardGroupFn(database, policy, timestamp) +} + +func (c *MetaClientMock) CreateSubscription(database, rp, name, mode string, destinations []string) error { + return c.CreateSubscriptionFn(database, rp, name, mode, destinations) +} + +func (c *MetaClientMock) CreateUser(name, password string, admin bool) (*meta.UserInfo, error) { + return c.CreateUserFn(name, password, admin) +} + +func (c *MetaClientMock) Database(name string) *meta.DatabaseInfo { + return c.DatabaseFn(name) +} + +func (c *MetaClientMock) Databases() ([]meta.DatabaseInfo, error) { + return c.DatabasesFn() +} + +func (c *MetaClientMock) DeleteShardGroup(database string, policy string, id uint64) error { + return c.DeleteShardGroup(database, policy, id) +} + +func (c *MetaClientMock) DropContinuousQuery(database, name string) error { + return c.DropContinuousQueryFn(database, name) +} + +func (c *MetaClientMock) DropDatabase(name string) error { + return c.DropDatabaseFn(name) +} + +func (c *MetaClientMock) DropRetentionPolicy(database, name string) error { + return c.DropRetentionPolicyFn(database, name) +} + +func (c *MetaClientMock) DropShard(id uint64) error { + return c.DropShardFn(id) +} + +func (c *MetaClientMock) DropSubscription(database, rp, name string) error { + return c.DropSubscriptionFn(database, rp, name) +} + +func (c *MetaClientMock) DropUser(name string) error { + return c.DropUserFn(name) +} + +func (c *MetaClientMock) RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error) { + return c.RetentionPolicyFn(database, name) +} + +func (c *MetaClientMock) SetAdminPrivilege(username string, admin bool) error { + return c.SetAdminPrivilegeFn(username, admin) +} + +func (c *MetaClientMock) SetDefaultRetentionPolicy(database, name string) error { + return c.SetDefaultRetentionPolicyFn(database, name) +} + +func (c *MetaClientMock) SetPrivilege(username, database string, p influxql.Privilege) error { + return c.SetPrivilegeFn(username, database, p) +} + +func (c *MetaClientMock) ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error) { + return c.ShardsByTimeRangeFn(sources, tmin, tmax) +} + +func (c *MetaClientMock) ShardOwner(shardID uint64) (database, policy string, sgi *meta.ShardGroupInfo) { + return c.ShardOwnerFn(shardID) +} + +func (c *MetaClientMock) UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate) error { + return c.UpdateRetentionPolicyFn(database, name, rpu) +} + +func (c *MetaClientMock) UpdateUser(name, password string) error { + return c.UpdateUserFn(name, password) +} + +func (c *MetaClientMock) UserPrivilege(username, database string) (*influxql.Privilege, error) { + return c.UserPrivilegeFn(username, database) +} + +func (c *MetaClientMock) UserPrivileges(username string) (map[string]influxql.Privilege, error) { + return c.UserPrivilegesFn(username) +} + +func (c *MetaClientMock) Users() []meta.UserInfo { return c.UsersFn() } + +func (c *MetaClientMock) Open() error { return c.OpenFn() } +func (c *MetaClientMock) Data() meta.Data { return c.DataFn() } +func (c *MetaClientMock) SetData(d *meta.Data) error { return c.SetDataFn(d) } diff --git a/services/collectd/service.go b/services/collectd/service.go index f29e41c8da..6cd2b86b58 100644 --- a/services/collectd/service.go +++ b/services/collectd/service.go @@ -50,12 +50,14 @@ type Service struct { wg sync.WaitGroup err chan error - stop chan struct{} conn *net.UDPConn batcher *tsdb.PointBatcher typesdb gollectd.Types addr net.Addr + mu sync.Mutex + done chan struct{} + // expvar-based stats. stats *Statistics defaultTags models.StatisticTags @@ -78,6 +80,14 @@ func NewService(c Config) *Service { // Open starts the service. func (s *Service) Open() error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.closed() { + return nil // Already open. + } + s.done = make(chan struct{}) + s.Logger.Printf("Starting collectd service") if s.Config.BindAddress == "" { @@ -142,7 +152,6 @@ func (s *Service) Open() error { s.typesdb = typesdb } } - // Resolve our address. addr, err := net.ResolveUDPAddr("udp", s.Config.BindAddress) if err != nil { @@ -171,8 +180,7 @@ func (s *Service) Open() error { s.batcher = tsdb.NewPointBatcher(s.Config.BatchSize, s.Config.BatchPending, time.Duration(s.Config.BatchDuration)) s.batcher.Start() - // Create channel and wait group for signalling goroutines to stop. - s.stop = make(chan struct{}) + // Create waitgroup for signalling goroutines to stop. s.wg.Add(2) // Start goroutines that process collectd packets. @@ -184,10 +192,15 @@ func (s *Service) Open() error { // Close stops the service. func (s *Service) Close() error { - // Close the connection, and wait for the goroutine to exit. - if s.stop != nil { - close(s.stop) + s.mu.Lock() + defer s.mu.Unlock() + + if s.closed() { + return nil // Already closed. } + close(s.done) + + // Close the connection, and wait for the goroutine to exit. if s.conn != nil { s.conn.Close() } @@ -197,13 +210,30 @@ func (s *Service) Close() error { s.wg.Wait() // Release all remaining resources. - s.stop = nil s.conn = nil s.batcher = nil s.Logger.Println("collectd UDP closed") + s.done = nil return nil } +// Closed returns true if the service is currently closed. +func (s *Service) Closed() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.closed() +} + +func (s *Service) closed() bool { + select { + case <-s.done: + // Service is closing. + return true + default: + } + return s.done == nil +} + // SetLogOutput sets the writer to which all logs are written. It must not be // called after Open is called. func (s *Service) SetLogOutput(w io.Writer) { @@ -269,7 +299,7 @@ func (s *Service) serve() { for { select { - case <-s.stop: + case <-s.done: // We closed the connection, time to go. return default: @@ -310,7 +340,7 @@ func (s *Service) writePoints() { for { select { - case <-s.stop: + case <-s.done: return case batch := <-s.batcher.Out(): if err := s.PointsWriter.WritePoints(s.Config.Database, s.Config.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil { diff --git a/services/collectd/service_test.go b/services/collectd/service_test.go index bbd4186353..76ae6e0327 100644 --- a/services/collectd/service_test.go +++ b/services/collectd/service_test.go @@ -9,33 +9,68 @@ import ( "testing" "time" + "github.com/influxdata/influxdb/internal" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/toml" ) +func TestService_OpenClose(t *testing.T) { + service := NewTestService(1, time.Second) + + // Closing a closed service is fine. + if err := service.Service.Close(); err != nil { + t.Fatal(err) + } + + // Closing a closed service again is fine. + if err := service.Service.Close(); err != nil { + t.Fatal(err) + } + + if err := service.Service.Open(); err != nil { + t.Fatal(err) + } + + // Opening an already open service is fine. + if err := service.Service.Open(); err != nil { + t.Fatal(err) + } + + // Reopening a previously opened service is fine. + if err := service.Service.Close(); err != nil { + t.Fatal(err) + } + + if err := service.Service.Open(); err != nil { + t.Fatal(err) + } + + // Tidy up. + if err := service.Service.Close(); err != nil { + t.Fatal(err) + } +} + // Test that the service checks / creates the target database on startup. func TestService_CreatesDatabase(t *testing.T) { t.Parallel() - s := newTestService(1, time.Second) + s := NewTestService(1, time.Second) - createDatabaseCalled := false - - ms := &testMetaClient{} - ms.CreateDatabaseIfNotExistsFn = func(name string) (*meta.DatabaseInfo, error) { + var created bool + s.MetaClient.CreateDatabaseFn = func(name string) (*meta.DatabaseInfo, error) { if name != s.Config.Database { t.Errorf("\n\texp = %s\n\tgot = %s\n", s.Config.Database, name) } - createDatabaseCalled = true + created = true return nil, nil } - s.Service.MetaClient = ms - s.Open() - s.Close() + s.Service.Open() + s.Service.Close() - if !createDatabaseCalled { + if !created { t.Errorf("CreateDatabaseIfNotExists should have been called when the service opened.") } } @@ -51,11 +86,10 @@ func TestService_BatchSize(t *testing.T) { for _, batchSize := range batchSizes { func() { - s := newTestService(batchSize, time.Second) + s := NewTestService(batchSize, time.Second) pointCh := make(chan models.Point) - s.MetaClient.CreateDatabaseIfNotExistsFn = func(name string) (*meta.DatabaseInfo, error) { return nil, nil } - s.PointsWriter.WritePointsFn = func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { + s.WritePointsFn = func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { if len(points) != batchSize { t.Errorf("\n\texp = %d\n\tgot = %d\n", batchSize, len(points)) } @@ -66,13 +100,13 @@ func TestService_BatchSize(t *testing.T) { return nil } - if err := s.Open(); err != nil { + if err := s.Service.Open(); err != nil { t.Fatal(err) } - defer func() { t.Log("closing service"); s.Close() }() + defer func() { t.Log("closing service"); s.Service.Close() }() // Get the address & port the service is listening on for collectd data. - addr := s.Addr() + addr := s.Service.Addr() conn, err := net.Dial("udp", addr.String()) if err != nil { t.Fatal(err) @@ -120,24 +154,23 @@ func TestService_BatchDuration(t *testing.T) { totalPoints := len(expPoints) - s := newTestService(5000, 250*time.Millisecond) + s := NewTestService(5000, 250*time.Millisecond) pointCh := make(chan models.Point, 1000) - s.MetaClient.CreateDatabaseIfNotExistsFn = func(name string) (*meta.DatabaseInfo, error) { return nil, nil } - s.PointsWriter.WritePointsFn = func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { + s.WritePointsFn = func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { for _, p := range points { pointCh <- p } return nil } - if err := s.Open(); err != nil { + if err := s.Service.Open(); err != nil { t.Fatal(err) } - defer func() { t.Log("closing service"); s.Close() }() + defer func() { t.Log("closing service"); s.Service.Close() }() // Get the address & port the service is listening on for collectd data. - addr := s.Addr() + addr := s.Service.Addr() conn, err := net.Dial("udp", addr.String()) if err != nil { t.Fatal(err) @@ -177,53 +210,50 @@ Loop: } } -type testService struct { - *Service - MetaClient testMetaClient - PointsWriter testPointsWriter +type TestService struct { + Service *Service + Config Config + MetaClient *internal.MetaClientMock + WritePointsFn func(string, string, models.ConsistencyLevel, []models.Point) error } -func newTestService(batchSize int, batchDuration time.Duration) *testService { - s := &testService{ - Service: NewService(Config{ - BindAddress: "127.0.0.1:0", - Database: "collectd_test", - BatchSize: batchSize, - BatchDuration: toml.Duration(batchDuration), - }), +func NewTestService(batchSize int, batchDuration time.Duration) *TestService { + c := Config{ + BindAddress: "127.0.0.1:0", + Database: "collectd_test", + BatchSize: batchSize, + BatchDuration: toml.Duration(batchDuration), } - s.Service.PointsWriter = &s.PointsWriter - s.Service.MetaClient = &s.MetaClient + + s := &TestService{ + Config: c, + Service: NewService(c), + MetaClient: &internal.MetaClientMock{}, + } + + s.MetaClient.CreateDatabaseFn = func(name string) (*meta.DatabaseInfo, error) { + return nil, nil + } + + s.Service.PointsWriter = s + s.Service.MetaClient = s.MetaClient // Set the collectd types using test string. - if err := s.SetTypes(typesDBText); err != nil { + if err := s.Service.SetTypes(typesDBText); err != nil { panic(err) } if !testing.Verbose() { - s.Logger = log.New(ioutil.Discard, "", log.LstdFlags) + s.Service.Logger = log.New(ioutil.Discard, "", log.LstdFlags) } return s } -type testPointsWriter struct { - WritePointsFn func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error -} - -func (w *testPointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { +func (w *TestService) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { return w.WritePointsFn(database, retentionPolicy, consistencyLevel, points) } -type testMetaClient struct { - CreateDatabaseIfNotExistsFn func(name string) (*meta.DatabaseInfo, error) - //DatabaseFn func(name string) (*meta.DatabaseInfo, error) -} - -func (ms *testMetaClient) CreateDatabase(name string) (*meta.DatabaseInfo, error) { - return ms.CreateDatabaseIfNotExistsFn(name) -} - func wait(c chan struct{}, d time.Duration) (err error) { select { case <-c: diff --git a/services/graphite/service.go b/services/graphite/service.go index 9b440dbd71..9ac0468b8d 100644 --- a/services/graphite/service.go +++ b/services/graphite/service.go @@ -108,7 +108,6 @@ func NewService(c Config) (*Service, error) { stats: &Statistics{}, defaultTags: models.StatisticTags{"proto": d.Protocol, "bind": d.BindAddress}, tcpConnections: make(map[string]*tcpConnection), - done: make(chan struct{}), diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"), } @@ -130,6 +129,11 @@ func (s *Service) Open() error { s.mu.Lock() defer s.mu.Unlock() + if !s.closed() { + return nil // Already open. + } + s.done = make(chan struct{}) + s.logger.Printf("Starting graphite service, batch size %d, batch timeout %s", s.batchSize, s.batchTimeout) // Register diagnostics if a Monitor service is available. @@ -187,6 +191,11 @@ func (s *Service) Close() error { s.mu.Lock() defer s.mu.Unlock() + if s.closed() { + return nil // Already closed. + } + close(s.done) + s.closeAllConnections() if s.ln != nil { @@ -204,13 +213,29 @@ func (s *Service) Close() error { s.Monitor.DeregisterDiagnosticsClient(s.diagsKey) } - close(s.done) s.wg.Wait() s.done = nil return nil } +// Closed returns true if the service is currently closed. +func (s *Service) Closed() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.closed() +} + +func (s *Service) closed() bool { + select { + case <-s.done: + // Service is closing. + return true + default: + } + return s.done == nil +} + // SetLogOutput sets the writer to which all logs are written. It must not be // called after Open is called. func (s *Service) SetLogOutput(w io.Writer) { diff --git a/services/graphite/service_test.go b/services/graphite/service_test.go index 47b373d3a6..0b259daf1b 100644 --- a/services/graphite/service_test.go +++ b/services/graphite/service_test.go @@ -2,18 +2,58 @@ package graphite_test import ( "fmt" + "io/ioutil" "net" "sync" "testing" "time" + "github.com/influxdata/influxdb/internal" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/services/graphite" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/toml" ) -func Test_ServerGraphiteTCP(t *testing.T) { +func Test_Service_OpenClose(t *testing.T) { + c := graphite.Config{BindAddress: ":35422"} + service := NewService(&c) + + // Closing a closed service is fine. + if err := service.GraphiteService.Close(); err != nil { + t.Fatal(err) + } + + // Closing a closed service again is fine. + if err := service.GraphiteService.Close(); err != nil { + t.Fatal(err) + } + + if err := service.GraphiteService.Open(); err != nil { + t.Fatal(err) + } + + // Opening an already open service is fine. + if err := service.GraphiteService.Open(); err != nil { + t.Fatal(err) + } + + // Reopening a previously opened service is fine. + if err := service.GraphiteService.Close(); err != nil { + t.Fatal(err) + } + + if err := service.GraphiteService.Open(); err != nil { + t.Fatal(err) + } + + // Tidy up. + if err := service.GraphiteService.Close(); err != nil { + t.Fatal(err) + } +} + +func Test_Service_TCP(t *testing.T) { t.Parallel() now := time.Now().UTC().Round(time.Second) @@ -24,51 +64,52 @@ func Test_ServerGraphiteTCP(t *testing.T) { config.BatchTimeout = toml.Duration(time.Second) config.BindAddress = ":0" - service, err := graphite.NewService(config) - if err != nil { - t.Fatalf("failed to create Graphite service: %s", err.Error()) - } + service := NewService(&config) // Allow test to wait until points are written. var wg sync.WaitGroup wg.Add(1) - pointsWriter := PointsWriter{ - WritePointsFn: func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { - defer wg.Done() + service.WritePointsFn = func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { + defer wg.Done() - pt, _ := models.NewPoint( - "cpu", - models.NewTags(map[string]string{}), - map[string]interface{}{"value": 23.456}, - time.Unix(now.Unix(), 0)) + pt, _ := models.NewPoint( + "cpu", + models.NewTags(map[string]string{}), + map[string]interface{}{"value": 23.456}, + time.Unix(now.Unix(), 0)) - if database != "graphitedb" { - t.Fatalf("unexpected database: %s", database) - } else if retentionPolicy != "" { - t.Fatalf("unexpected retention policy: %s", retentionPolicy) - } else if len(points) != 1 { - t.Fatalf("expected 1 point, got %d", len(points)) - } else if points[0].String() != pt.String() { - t.Fatalf("expected point %v, got %v", pt.String(), points[0].String()) - } - return nil - }, + if database != "graphitedb" { + t.Fatalf("unexpected database: %s", database) + } else if retentionPolicy != "" { + t.Fatalf("unexpected retention policy: %s", retentionPolicy) + } else if len(points) != 1 { + t.Fatalf("expected 1 point, got %d", len(points)) + } else if points[0].String() != pt.String() { + t.Fatalf("expected point %v, got %v", pt.String(), points[0].String()) + } + return nil } - service.PointsWriter = &pointsWriter - dbCreator := DatabaseCreator{} - service.MetaClient = &dbCreator - if err := service.Open(); err != nil { + var created bool + service.MetaClient.CreateDatabaseWithRetentionPolicyFn = func(db string, _ *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) { + if db != config.Database { + t.Fatalf("got %s, expected %s", db, config.Database) + } + created = true + return nil, nil + } + + if err := service.GraphiteService.Open(); err != nil { t.Fatalf("failed to open Graphite service: %s", err.Error()) } - if !dbCreator.Created { + if !created { t.Fatalf("failed to create target database") } // Connect to the graphite endpoint we just spun up - _, port, _ := net.SplitHostPort(service.Addr().String()) + _, port, _ := net.SplitHostPort(service.GraphiteService.Addr().String()) conn, err := net.Dial("tcp", "127.0.0.1:"+port) if err != nil { t.Fatal(err) @@ -88,7 +129,7 @@ func Test_ServerGraphiteTCP(t *testing.T) { wg.Wait() } -func Test_ServerGraphiteUDP(t *testing.T) { +func Test_Service_UDP(t *testing.T) { t.Parallel() now := time.Now().UTC().Round(time.Second) @@ -100,48 +141,49 @@ func Test_ServerGraphiteUDP(t *testing.T) { config.BindAddress = ":10000" config.Protocol = "udp" - service, err := graphite.NewService(config) - if err != nil { - t.Fatalf("failed to create Graphite service: %s", err.Error()) - } + service := NewService(&config) // Allow test to wait until points are written. var wg sync.WaitGroup wg.Add(1) - pointsWriter := PointsWriter{ - WritePointsFn: func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { - defer wg.Done() + service.WritePointsFn = func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { + defer wg.Done() - pt, _ := models.NewPoint( - "cpu", - models.NewTags(map[string]string{}), - map[string]interface{}{"value": 23.456}, - time.Unix(now.Unix(), 0)) - if database != "graphitedb" { - t.Fatalf("unexpected database: %s", database) - } else if retentionPolicy != "" { - t.Fatalf("unexpected retention policy: %s", retentionPolicy) - } else if points[0].String() != pt.String() { - t.Fatalf("unexpected points: %#v", points[0].String()) - } - return nil - }, + pt, _ := models.NewPoint( + "cpu", + models.NewTags(map[string]string{}), + map[string]interface{}{"value": 23.456}, + time.Unix(now.Unix(), 0)) + if database != "graphitedb" { + t.Fatalf("unexpected database: %s", database) + } else if retentionPolicy != "" { + t.Fatalf("unexpected retention policy: %s", retentionPolicy) + } else if points[0].String() != pt.String() { + t.Fatalf("unexpected points: %#v", points[0].String()) + } + return nil } - service.PointsWriter = &pointsWriter - dbCreator := DatabaseCreator{} - service.MetaClient = &dbCreator - if err := service.Open(); err != nil { + var created bool + service.MetaClient.CreateDatabaseWithRetentionPolicyFn = func(db string, _ *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) { + if db != config.Database { + t.Fatalf("got %s, expected %s", db, config.Database) + } + created = true + return nil, nil + } + + if err := service.GraphiteService.Open(); err != nil { t.Fatalf("failed to open Graphite service: %s", err.Error()) } - if !dbCreator.Created { + if !created { t.Fatalf("failed to create target database") } // Connect to the graphite endpoint we just spun up - _, port, _ := net.SplitHostPort(service.Addr().String()) + _, port, _ := net.SplitHostPort(service.GraphiteService.Addr().String()) conn, err := net.Dial("udp", "127.0.0.1:"+port) if err != nil { t.Fatal(err) @@ -158,39 +200,59 @@ func Test_ServerGraphiteUDP(t *testing.T) { conn.Close() } -// PointsWriter represents a mock impl of PointsWriter. -type PointsWriter struct { +type Service struct { + GraphiteService *graphite.Service + + MetaClient *internal.MetaClientMock WritePointsFn func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error } -func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { - return w.WritePointsFn(database, retentionPolicy, consistencyLevel, points) +func NewService(c *graphite.Config) *Service { + if c == nil { + defaultC := graphite.NewConfig() + c = &defaultC + } + + gservice, err := graphite.NewService(*c) + if err != nil { + panic(err) + } + + service := &Service{ + GraphiteService: gservice, + MetaClient: &internal.MetaClientMock{}, + } + + service.MetaClient.CreateRetentionPolicyFn = func(string, *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) { + return nil, nil + } + + service.MetaClient.CreateDatabaseWithRetentionPolicyFn = func(string, *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) { + return nil, nil + } + + service.MetaClient.DatabaseFn = func(string) *meta.DatabaseInfo { + return nil + } + + service.MetaClient.RetentionPolicyFn = func(string, string) (*meta.RetentionPolicyInfo, error) { + return nil, nil + } + + // Set the Meta Client + service.GraphiteService.MetaClient = service.MetaClient + + // Set the PointsWriter + service.GraphiteService.PointsWriter = service + + if !testing.Verbose() { + service.GraphiteService.SetLogOutput(ioutil.Discard) + } + return service } -type DatabaseCreator struct { - Created bool -} - -func (d *DatabaseCreator) CreateDatabase(name string) (*meta.DatabaseInfo, error) { - d.Created = true - return nil, nil -} - -func (d *DatabaseCreator) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) { - return nil, nil -} - -func (d *DatabaseCreator) CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) { - d.Created = true - return nil, nil -} - -func (d *DatabaseCreator) Database(name string) *meta.DatabaseInfo { - return nil -} - -func (d *DatabaseCreator) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) { - return nil, nil +func (s *Service) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { + return s.WritePointsFn(database, retentionPolicy, consistencyLevel, points) } // Test Helpers diff --git a/services/opentsdb/handler.go b/services/opentsdb/handler.go index 7b3ccb0e6f..83a5c90b0d 100644 --- a/services/opentsdb/handler.go +++ b/services/opentsdb/handler.go @@ -9,6 +9,7 @@ import ( "log" "net" "net/http" + "sync" "sync/atomic" "time" @@ -138,8 +139,10 @@ func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) { // chanListener represents a listener that receives connections through a channel. type chanListener struct { - addr net.Addr - ch chan net.Conn + addr net.Addr + ch chan net.Conn + done chan struct{} + closer sync.Once // closer ensures that Close is idempotent. } // newChanListener returns a new instance of chanListener. @@ -147,20 +150,28 @@ func newChanListener(addr net.Addr) *chanListener { return &chanListener{ addr: addr, ch: make(chan net.Conn), + done: make(chan struct{}), } } func (ln *chanListener) Accept() (net.Conn, error) { - conn, ok := <-ln.ch - if !ok { - return nil, errors.New("network connection closed") + errClosed := errors.New("network connection closed") + select { + case <-ln.done: + return nil, errClosed + case conn, ok := <-ln.ch: + if !ok { + return nil, errClosed + } + return conn, nil } - return conn, nil } // Close closes the connection channel. func (ln *chanListener) Close() error { - close(ln.ch) + ln.closer.Do(func() { + close(ln.done) + }) return nil } diff --git a/services/opentsdb/service.go b/services/opentsdb/service.go index d17944dcef..c382327823 100644 --- a/services/opentsdb/service.go +++ b/services/opentsdb/service.go @@ -83,7 +83,6 @@ func NewService(c Config) (*Service, error) { d := c.WithDefaults() s := &Service{ - done: make(chan struct{}), tls: d.TLSEnabled, cert: d.Certificate, err: make(chan error), @@ -106,6 +105,11 @@ func (s *Service) Open() error { s.mu.Lock() defer s.mu.Unlock() + if !s.closed() { + return nil // Already open. + } + s.done = make(chan struct{}) + s.Logger.Println("Starting OpenTSDB service") if _, err := s.MetaClient.CreateDatabase(s.Database); err != nil { @@ -118,7 +122,7 @@ func (s *Service) Open() error { // Start processing batches. s.wg.Add(1) - go s.processBatches(s.batcher) + go func() { defer s.wg.Done(); s.processBatches(s.batcher) }() // Open listener. if s.tls { @@ -149,8 +153,8 @@ func (s *Service) Open() error { // Begin listening for connections. s.wg.Add(2) - go s.serveHTTP() - go s.serve() + go func() { defer s.wg.Done(); s.serve() }() + go func() { defer s.wg.Done(); s.serveHTTP() }() return nil } @@ -160,18 +164,46 @@ func (s *Service) Close() error { s.mu.Lock() defer s.mu.Unlock() - if s.ln != nil { - return s.ln.Close() + if s.closed() { + return nil // Already closed. } + close(s.done) + + // Close the listeners. + if err := s.ln.Close(); err != nil { + return err + } + if err := s.httpln.Close(); err != nil { + return err + } + + s.wg.Wait() + s.done = nil if s.batcher != nil { s.batcher.Stop() } - close(s.done) - s.wg.Wait() + return nil } +// Closed returns true if the service is currently closed. +func (s *Service) Closed() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.closed() +} + +func (s *Service) closed() bool { + select { + case <-s.done: + // Service is closing. + return true + default: + return s.done == nil + } +} + // SetLogOutput sets the writer to which all logs are written. It must not be // called after Open is called. func (s *Service) SetLogOutput(w io.Writer) { @@ -225,7 +257,7 @@ func (s *Service) Statistics(tags map[string]string) []models.Statistic { } // Err returns a channel for fatal errors that occur on the listener. -func (s *Service) Err() <-chan error { return s.err } +// func (s *Service) Err() <-chan error { return s.err } // Addr returns the listener's address. Returns nil if listener is closed. func (s *Service) Addr() net.Addr { @@ -237,8 +269,6 @@ func (s *Service) Addr() net.Addr { // serve serves the handler from the listener. func (s *Service) serve() { - defer s.wg.Done() - for { // Wait for next connection. conn, err := s.ln.Accept() @@ -282,6 +312,7 @@ func (s *Service) handleConn(conn net.Conn) { // Otherwise handle in telnet format. s.wg.Add(1) s.handleTelnetConn(conn) + s.wg.Done() } // handleTelnetConn accepts OpenTSDB's telnet protocol. @@ -289,7 +320,6 @@ func (s *Service) handleConn(conn net.Conn) { // put sys.cpu.user 1356998400 42.5 host=webserver01 cpu=0 func (s *Service) handleTelnetConn(conn net.Conn) { defer conn.Close() - defer s.wg.Done() defer atomic.AddInt64(&s.stats.ActiveTelnetConnections, -1) atomic.AddInt64(&s.stats.ActiveTelnetConnections, 1) atomic.AddInt64(&s.stats.HandledTelnetConnections, 1) @@ -408,7 +438,6 @@ func (s *Service) serveHTTP() { // processBatches continually drains the given batcher and writes the batches to the database. func (s *Service) processBatches(batcher *tsdb.PointBatcher) { - defer s.wg.Done() for { select { case batch := <-batcher.Out(): diff --git a/services/opentsdb/service_test.go b/services/opentsdb/service_test.go index d75920a959..8e1642e51c 100644 --- a/services/opentsdb/service_test.go +++ b/services/opentsdb/service_test.go @@ -1,6 +1,7 @@ package opentsdb_test import ( + "fmt" "io/ioutil" "log" "net" @@ -12,24 +13,62 @@ import ( "time" "github.com/davecgh/go-spew/spew" + "github.com/influxdata/influxdb/internal" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/services/opentsdb" ) +func Test_Service_OpenClose(t *testing.T) { + service := NewService("db0", "127.0.0.1:45362") + + // Closing a closed service is fine. + if err := service.Service.Close(); err != nil { + t.Fatal(err) + } + + // Closing a closed service again is fine. + if err := service.Service.Close(); err != nil { + t.Fatal(err) + } + + if err := service.Service.Open(); err != nil { + t.Fatal(err) + } + + // Opening an already open service is fine. + if err := service.Service.Open(); err != nil { + t.Fatal(err) + } + + // Reopening a previously opened service is fine. + if err := service.Service.Close(); err != nil { + t.Fatal(err) + } + + if err := service.Service.Open(); err != nil { + t.Fatal(err) + } + + // Tidy up. + if err := service.Service.Close(); err != nil { + t.Fatal(err) + } +} + // Ensure a point can be written via the telnet protocol. func TestService_Telnet(t *testing.T) { t.Parallel() - s := NewService("db0") - if err := s.Open(); err != nil { + s := NewService("db0", "127.0.0.1:0") + if err := s.Service.Open(); err != nil { t.Fatal(err) } - defer s.Close() + defer s.Service.Close() // Mock points writer. var called int32 - s.PointsWriter.WritePointsFn = func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { + s.WritePointsFn = func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { atomic.StoreInt32(&called, 1) if database != "db0" { @@ -50,7 +89,7 @@ func TestService_Telnet(t *testing.T) { } // Open connection to the service. - conn, err := net.Dial("tcp", s.Addr().String()) + conn, err := net.Dial("tcp", s.Service.Addr().String()) if err != nil { t.Fatal(err) } @@ -84,15 +123,15 @@ func TestService_Telnet(t *testing.T) { func TestService_HTTP(t *testing.T) { t.Parallel() - s := NewService("db0") - if err := s.Open(); err != nil { + s := NewService("db0", "127.0.0.1:0") + if err := s.Service.Open(); err != nil { t.Fatal(err) } - defer s.Close() + defer s.Service.Close() // Mock points writer. var called bool - s.PointsWriter.WritePointsFn = func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { + s.WritePointsFn = func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { called = true if database != "db0" { t.Fatalf("unexpected database: %s", database) @@ -113,7 +152,7 @@ func TestService_HTTP(t *testing.T) { } // Write HTTP request to server. - resp, err := http.Post("http://"+s.Addr().String()+"/api/put", "application/json", strings.NewReader(`{"metric":"sys.cpu.nice", "timestamp":1346846400, "value":18, "tags":{"host":"web01", "dc":"lga"}}`)) + resp, err := http.Post("http://"+s.Service.Addr().String()+"/api/put", "application/json", strings.NewReader(`{"metric":"sys.cpu.nice", "timestamp":1346846400, "value":18, "tags":{"host":"web01", "dc":"lga"}}`)) if err != nil { t.Fatal(err) } @@ -131,40 +170,44 @@ func TestService_HTTP(t *testing.T) { } type Service struct { - *opentsdb.Service - PointsWriter PointsWriter -} - -// NewService returns a new instance of Service. -func NewService(database string) *Service { - srv, _ := opentsdb.NewService(opentsdb.Config{ - BindAddress: "127.0.0.1:0", - Database: database, - ConsistencyLevel: "one", - }) - s := &Service{Service: srv} - s.Service.PointsWriter = &s.PointsWriter - s.Service.MetaClient = &DatabaseCreator{} - - if !testing.Verbose() { - s.Logger = log.New(ioutil.Discard, "", log.LstdFlags) - } - - return s -} - -// PointsWriter represents a mock impl of PointsWriter. -type PointsWriter struct { + Service *opentsdb.Service + MetaClient *internal.MetaClientMock WritePointsFn func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error } -func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { - return w.WritePointsFn(database, retentionPolicy, consistencyLevel, points) +// NewService returns a new instance of Service. +func NewService(database string, bind string) *Service { + s, err := opentsdb.NewService(opentsdb.Config{ + BindAddress: bind, + Database: database, + ConsistencyLevel: "one", + }) + + if err != nil { + panic(err) + } + + service := &Service{ + Service: s, + MetaClient: &internal.MetaClientMock{}, + } + + service.MetaClient.CreateDatabaseFn = func(db string) (*meta.DatabaseInfo, error) { + if got, exp := db, database; got != exp { + return nil, fmt.Errorf("got %v, expected %v", got, exp) + } + return nil, nil + } + + if !testing.Verbose() { + service.Service.Logger = log.New(ioutil.Discard, "", log.LstdFlags) + } + + service.Service.MetaClient = service.MetaClient + service.Service.PointsWriter = service + return service } -type DatabaseCreator struct { -} - -func (d *DatabaseCreator) CreateDatabase(name string) (*meta.DatabaseInfo, error) { - return nil, nil +func (s *Service) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { + return s.WritePointsFn(database, retentionPolicy, consistencyLevel, points) } diff --git a/services/udp/service.go b/services/udp/service.go index 5bdcf69cc3..f9403d8e0a 100644 --- a/services/udp/service.go +++ b/services/udp/service.go @@ -42,6 +42,8 @@ type Service struct { conn *net.UDPConn addr *net.UDPAddr wg sync.WaitGroup + + mu sync.Mutex done chan struct{} parserChan chan []byte @@ -66,7 +68,6 @@ func NewService(c Config) *Service { d := *c.WithDefaults() return &Service{ config: d, - done: make(chan struct{}), parserChan: make(chan []byte, parserChanLen), batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)), Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags), @@ -77,6 +78,14 @@ func NewService(c Config) *Service { // Open starts the service func (s *Service) Open() (err error) { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.closed() { + return nil // Already open. + } + s.done = make(chan struct{}) + if s.config.BindAddress == "" { return errors.New("bind address has to be specified in config") } @@ -220,13 +229,19 @@ func (s *Service) parser() { // Close closes the underlying listener. func (s *Service) Close() error { - if s.conn == nil { - return errors.New("Service already closed") + s.mu.Lock() + defer s.mu.Unlock() + + if s.closed() { + return nil // Already closed. + } + close(s.done) + + if s.conn != nil { + s.conn.Close() } - s.conn.Close() s.batcher.Flush() - close(s.done) s.wg.Wait() // Release all remaining resources. @@ -238,6 +253,23 @@ func (s *Service) Close() error { return nil } +// Closed returns true if the service is currently closed. +func (s *Service) Closed() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.closed() +} + +func (s *Service) closed() bool { + select { + case <-s.done: + // Service is closing. + return true + default: + } + return s.done == nil +} + // SetLogOutput sets the writer to which all logs are written. It must not be // called after Open is called. func (s *Service) SetLogOutput(w io.Writer) { diff --git a/services/udp/service_test.go b/services/udp/service_test.go new file mode 100644 index 0000000000..27c857e260 --- /dev/null +++ b/services/udp/service_test.go @@ -0,0 +1,75 @@ +package udp_test + +import ( + "io/ioutil" + "testing" + + "github.com/influxdata/influxdb/internal" + "github.com/influxdata/influxdb/services/meta" + "github.com/influxdata/influxdb/services/udp" +) + +func TestService_OpenClose(t *testing.T) { + service := NewService(nil) + + // Closing a closed service is fine. + if err := service.UDPService.Close(); err != nil { + t.Fatal(err) + } + + // Closing a closed service again is fine. + if err := service.UDPService.Close(); err != nil { + t.Fatal(err) + } + + if err := service.UDPService.Open(); err != nil { + t.Fatal(err) + } + + // Opening an already open service is fine. + if err := service.UDPService.Open(); err != nil { + t.Fatal(err) + } + + // Reopening a previously opened service is fine. + if err := service.UDPService.Close(); err != nil { + t.Fatal(err) + } + + if err := service.UDPService.Open(); err != nil { + t.Fatal(err) + } + + // Tidy up. + if err := service.UDPService.Close(); err != nil { + t.Fatal(err) + } +} + +type Service struct { + UDPService *udp.Service + MetaClient *internal.MetaClientMock +} + +func NewService(c *udp.Config) *Service { + if c == nil { + defaultC := udp.NewConfig() + c = &defaultC + } + + service := &Service{ + UDPService: udp.NewService(*c), + MetaClient: &internal.MetaClientMock{}, + } + + service.MetaClient.CreateDatabaseFn = func(string) (*meta.DatabaseInfo, error) { return nil, nil } + + // Set the Meta Client + service.UDPService.MetaClient = service.MetaClient + + if !testing.Verbose() { + service.UDPService.SetLogOutput(ioutil.Discard) + } + + return service +}