Merge pull request #8178 from influxdata/er-test-race

Fix race in test helpers
pull/8225/head^2
Edd Robinson 2017-03-29 12:56:41 +01:00 committed by GitHub
commit 374b04ada0
3 changed files with 34 additions and 12 deletions

View File

@ -30,6 +30,7 @@
- [#8167](https://github.com/influxdata/influxdb/issues/8167): Fix a regression when math was used with selectors.
- [#8175](https://github.com/influxdata/influxdb/issues/8175): Ensure the input for certain functions in the query engine are ordered.
- [#8171](https://github.com/influxdata/influxdb/issues/8171): Significantly improve shutdown speed for high cardinality databases.
- [#8177](https://github.com/influxdata/influxdb/issues/8177): Fix racy integration test.
## v1.2.2 [2017-03-14]

View File

@ -12,6 +12,7 @@ import (
"os"
"regexp"
"strings"
"sync"
"testing"
"time"
@ -43,13 +44,6 @@ type Server interface {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
}
// LocalServer is a Server that is running in-process and can be accessed directly
type LocalServer struct {
*client
*run.Server
Config *run.Config
}
// RemoteServer is a Server that is accessed remotely via the HTTP API
type RemoteServer struct {
*client
@ -247,8 +241,20 @@ func OpenDefaultServer(c *run.Config) Server {
return s
}
// LocalServer is a Server that is running in-process and can be accessed directly
type LocalServer struct {
mu sync.RWMutex
*run.Server
*client
Config *run.Config
}
// Close shuts down the server and removes all temporary paths.
func (s *LocalServer) Close() {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.Server.Close(); err != nil {
panic(err.Error())
}
@ -264,11 +270,15 @@ func (s *LocalServer) Close() {
}
func (s *LocalServer) Closed() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.Server == nil
}
// URL returns the base URL for the httpd endpoint.
func (s *LocalServer) URL() string {
s.mu.RLock()
defer s.mu.RUnlock()
for _, service := range s.Services {
if service, ok := service.(*httpd.Service); ok {
return "http://" + service.Addr().String()
@ -278,11 +288,15 @@ func (s *LocalServer) URL() string {
}
func (s *LocalServer) CreateDatabase(db string) (*meta.DatabaseInfo, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.MetaClient.CreateDatabase(db)
}
// CreateDatabaseAndRetentionPolicy will create the database and retention policy.
func (s *LocalServer) CreateDatabaseAndRetentionPolicy(db string, rp *meta.RetentionPolicySpec, makeDefault bool) error {
s.mu.RLock()
defer s.mu.RUnlock()
if _, err := s.MetaClient.CreateDatabase(db); err != nil {
return err
} else if _, err := s.MetaClient.CreateRetentionPolicy(db, rp, makeDefault); err != nil {
@ -292,14 +306,20 @@ func (s *LocalServer) CreateDatabaseAndRetentionPolicy(db string, rp *meta.Reten
}
func (s *LocalServer) CreateSubscription(database, rp, name, mode string, destinations []string) error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.MetaClient.CreateSubscription(database, rp, name, mode, destinations)
}
func (s *LocalServer) DropDatabase(db string) error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.MetaClient.DropDatabase(db)
}
func (s *LocalServer) Reset() error {
s.mu.RLock()
defer s.mu.RUnlock()
for _, db := range s.MetaClient.Databases() {
if err := s.DropDatabase(db.Name); err != nil {
return err
@ -308,6 +328,12 @@ func (s *LocalServer) Reset() error {
return nil
}
func (s *LocalServer) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.PointsWriter.WritePoints(database, retentionPolicy, consistencyLevel, points)
}
// client abstract querying and writing to a Server using HTTP
type client struct {
URLFn func() string
@ -410,10 +436,6 @@ func (wr WriteError) Error() string {
return fmt.Sprintf("invalid status code: code=%d, body=%s", wr.statusCode, wr.body)
}
func (s *LocalServer) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
return s.PointsWriter.WritePoints(database, retentionPolicy, consistencyLevel, points)
}
// Write executes a write against the server and returns the results.
func (s *client) Write(db, rp, body string, params url.Values) (results string, err error) {
if params == nil {

View File

@ -7723,7 +7723,6 @@ func TestServer_ConcurrentPointsWriter_Subscriber(t *testing.T) {
time.Sleep(10 * time.Millisecond)
close(done)
// Race occurs on s.Close()
}
// Ensure time in where clause is inclusive