Reduce lock contention on measurementFields
parent
b7c1e625b0
commit
637a67ea35
|
@ -98,7 +98,9 @@ type Engine struct {
|
|||
traceLogging bool
|
||||
|
||||
// TODO(benbjohnson): Index needs to be moved entirely into engine.
|
||||
index *tsdb.DatabaseIndex
|
||||
index *tsdb.DatabaseIndex
|
||||
|
||||
fieldsMu sync.RWMutex
|
||||
measurementFields map[string]*tsdb.MeasurementFields
|
||||
|
||||
WAL *WAL
|
||||
|
@ -286,21 +288,21 @@ func (e *Engine) Index() *tsdb.DatabaseIndex {
|
|||
|
||||
// MeasurementFields returns the measurement fields for a measurement.
|
||||
func (e *Engine) MeasurementFields(measurement string) *tsdb.MeasurementFields {
|
||||
e.mu.RLock()
|
||||
e.fieldsMu.RLock()
|
||||
m := e.measurementFields[measurement]
|
||||
e.mu.RUnlock()
|
||||
e.fieldsMu.RUnlock()
|
||||
|
||||
if m != nil {
|
||||
return m
|
||||
}
|
||||
|
||||
e.mu.Lock()
|
||||
e.fieldsMu.Lock()
|
||||
m = e.measurementFields[measurement]
|
||||
if m == nil {
|
||||
m = tsdb.NewMeasurementFields()
|
||||
e.measurementFields[measurement] = m
|
||||
}
|
||||
e.mu.Unlock()
|
||||
e.fieldsMu.Unlock()
|
||||
return m
|
||||
}
|
||||
|
||||
|
@ -628,11 +630,13 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key []byte, fieldType influxq
|
|||
m := index.CreateMeasurementIndexIfNotExists(measurement)
|
||||
m.SetFieldName(field)
|
||||
|
||||
e.fieldsMu.Lock()
|
||||
mf := e.measurementFields[measurement]
|
||||
if mf == nil {
|
||||
mf = tsdb.NewMeasurementFields()
|
||||
e.measurementFields[measurement] = mf
|
||||
}
|
||||
e.fieldsMu.Unlock()
|
||||
|
||||
if err := mf.CreateFieldIfNotExists(field, fieldType, false); err != nil {
|
||||
return err
|
||||
|
@ -803,9 +807,9 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
|
|||
|
||||
// DeleteMeasurement deletes a measurement and all related series.
|
||||
func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error {
|
||||
e.mu.Lock()
|
||||
e.fieldsMu.Lock()
|
||||
delete(e.measurementFields, name)
|
||||
e.mu.Unlock()
|
||||
e.fieldsMu.Unlock()
|
||||
|
||||
return e.DeleteSeries(seriesKeys)
|
||||
}
|
||||
|
@ -1537,9 +1541,9 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measu
|
|||
// buildCursor creates an untyped cursor for a field.
|
||||
func (e *Engine) buildCursor(measurement, seriesKey string, ref *influxql.VarRef, opt influxql.IteratorOptions) cursor {
|
||||
// Look up fields for measurement.
|
||||
e.mu.RLock()
|
||||
e.fieldsMu.RLock()
|
||||
mf := e.measurementFields[measurement]
|
||||
e.mu.RUnlock()
|
||||
e.fieldsMu.RUnlock()
|
||||
|
||||
if mf == nil {
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue