diff --git a/CHANGELOG.md b/CHANGELOG.md index f991582acc..98cb6edd79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ - [#8167](https://github.com/influxdata/influxdb/issues/8167): Fix a regression when math was used with selectors. - [#8175](https://github.com/influxdata/influxdb/issues/8175): Ensure the input for certain functions in the query engine are ordered. - [#8171](https://github.com/influxdata/influxdb/issues/8171): Significantly improve shutdown speed for high cardinality databases. +- [#8177](https://github.com/influxdata/influxdb/issues/8177): Fix racy integration test. ## v1.2.2 [2017-03-14] diff --git a/tests/server_helpers.go b/tests/server_helpers.go index 592aa935e8..3aa634cb43 100644 --- a/tests/server_helpers.go +++ b/tests/server_helpers.go @@ -12,6 +12,7 @@ import ( "os" "regexp" "strings" + "sync" "testing" "time" @@ -43,13 +44,6 @@ type Server interface { WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error } -// LocalServer is a Server that is running in-process and can be accessed directly -type LocalServer struct { - *client - *run.Server - Config *run.Config -} - // RemoteServer is a Server that is accessed remotely via the HTTP API type RemoteServer struct { *client @@ -247,8 +241,20 @@ func OpenDefaultServer(c *run.Config) Server { return s } +// LocalServer is a Server that is running in-process and can be accessed directly +type LocalServer struct { + mu sync.RWMutex + *run.Server + + *client + Config *run.Config +} + // Close shuts down the server and removes all temporary paths. func (s *LocalServer) Close() { + s.mu.Lock() + defer s.mu.Unlock() + if err := s.Server.Close(); err != nil { panic(err.Error()) } @@ -264,11 +270,15 @@ func (s *LocalServer) Close() { } func (s *LocalServer) Closed() bool { + s.mu.RLock() + defer s.mu.RUnlock() return s.Server == nil } // URL returns the base URL for the httpd endpoint. func (s *LocalServer) URL() string { + s.mu.RLock() + defer s.mu.RUnlock() for _, service := range s.Services { if service, ok := service.(*httpd.Service); ok { return "http://" + service.Addr().String() @@ -278,11 +288,15 @@ func (s *LocalServer) URL() string { } func (s *LocalServer) CreateDatabase(db string) (*meta.DatabaseInfo, error) { + s.mu.RLock() + defer s.mu.RUnlock() return s.MetaClient.CreateDatabase(db) } // CreateDatabaseAndRetentionPolicy will create the database and retention policy. func (s *LocalServer) CreateDatabaseAndRetentionPolicy(db string, rp *meta.RetentionPolicySpec, makeDefault bool) error { + s.mu.RLock() + defer s.mu.RUnlock() if _, err := s.MetaClient.CreateDatabase(db); err != nil { return err } else if _, err := s.MetaClient.CreateRetentionPolicy(db, rp, makeDefault); err != nil { @@ -292,14 +306,20 @@ func (s *LocalServer) CreateDatabaseAndRetentionPolicy(db string, rp *meta.Reten } func (s *LocalServer) CreateSubscription(database, rp, name, mode string, destinations []string) error { + s.mu.RLock() + defer s.mu.RUnlock() return s.MetaClient.CreateSubscription(database, rp, name, mode, destinations) } func (s *LocalServer) DropDatabase(db string) error { + s.mu.RLock() + defer s.mu.RUnlock() return s.MetaClient.DropDatabase(db) } func (s *LocalServer) Reset() error { + s.mu.RLock() + defer s.mu.RUnlock() for _, db := range s.MetaClient.Databases() { if err := s.DropDatabase(db.Name); err != nil { return err @@ -308,6 +328,12 @@ func (s *LocalServer) Reset() error { return nil } +func (s *LocalServer) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { + s.mu.RLock() + defer s.mu.RUnlock() + return s.PointsWriter.WritePoints(database, retentionPolicy, consistencyLevel, points) +} + // client abstract querying and writing to a Server using HTTP type client struct { URLFn func() string @@ -410,10 +436,6 @@ func (wr WriteError) Error() string { return fmt.Sprintf("invalid status code: code=%d, body=%s", wr.statusCode, wr.body) } -func (s *LocalServer) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { - return s.PointsWriter.WritePoints(database, retentionPolicy, consistencyLevel, points) -} - // Write executes a write against the server and returns the results. func (s *client) Write(db, rp, body string, params url.Values) (results string, err error) { if params == nil { diff --git a/tests/server_test.go b/tests/server_test.go index a17318eb66..b21a61153b 100644 --- a/tests/server_test.go +++ b/tests/server_test.go @@ -7723,7 +7723,6 @@ func TestServer_ConcurrentPointsWriter_Subscriber(t *testing.T) { time.Sleep(10 * time.Millisecond) close(done) - // Race occurs on s.Close() } // Ensure time in where clause is inclusive