Ensure syncronised access to engine

pull/6151/head
Edd Robinson 2016-03-29 22:55:09 +01:00
parent e7cce69530
commit 75a2218fa1
1 changed files with 19 additions and 6 deletions

View File

@ -76,11 +76,11 @@ type Shard struct {
database string
retentionPolicy string
engine Engine
options EngineOptions
mu sync.RWMutex
measurementFields map[string]*MeasurementFields // measurement name to their fields
engine Engine
// expvar-based stats.
statMap *expvar.Map
@ -217,6 +217,9 @@ type SeriesCreate struct {
// WritePoints will write the raw data points and any new metadata to the index in the shard
func (s *Shard) WritePoints(points []models.Point) error {
s.mu.RLock()
defer s.mu.RUnlock()
s.statMap.Add(statWriteReq, 1)
seriesToCreate, fieldsToCreate, seriesToAddShardTo, err := s.validateSeriesAndFields(points)
@ -255,9 +258,7 @@ func (s *Shard) WritePoints(points []models.Point) error {
}
// This was populated earlier, don't need to validate that it's there.
s.mu.RLock()
mf := s.measurementFields[p.Name()]
s.mu.RUnlock()
// If a measurement is dropped while writes for it are in progress, this could be nil
if mf == nil {
@ -284,6 +285,8 @@ func (s *Shard) WritePoints(points []models.Point) error {
// DeleteSeries deletes a list of series.
func (s *Shard) DeleteSeries(seriesKeys []string) error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.engine.DeleteSeries(seriesKeys)
}
@ -307,8 +310,8 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) (map[
return nil, nil
}
s.mu.Lock()
defer s.mu.Unlock()
s.mu.RLock()
defer s.mu.RUnlock()
// add fields
measurementsToSave := make(map[string]*MeasurementFields)
@ -387,10 +390,16 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*SeriesCreate,
}
// SeriesCount returns the number of series buckets on the shard.
func (s *Shard) SeriesCount() (int, error) { return s.engine.SeriesCount() }
func (s *Shard) SeriesCount() (int, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.engine.SeriesCount()
}
// WriteTo writes the shard's data to w.
func (s *Shard) WriteTo(w io.Writer) (int64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
n, err := s.engine.WriteTo(w)
s.statMap.Add(statWriteBytes, int64(n))
return n, err
@ -401,6 +410,8 @@ func (s *Shard) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator,
if influxql.Sources(opt.Sources).HasSystemSource() {
return s.createSystemIterator(opt)
}
s.mu.RLock()
defer s.mu.RUnlock()
return s.engine.CreateIterator(opt)
}
@ -472,6 +483,8 @@ func (s *Shard) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, e
return []influxql.Series{{Aux: auxFields}}, nil
}
s.mu.RLock()
defer s.mu.RUnlock()
return s.engine.SeriesKeys(opt)
}