Merge pull request #2773 from influxdb/pd-locking

Ensure proper locking of index structures on writes and queries.
pull/2780/head
Paul Dix 2015-06-04 19:50:49 -04:00
commit 55e0de30bd
6 changed files with 69 additions and 40 deletions

View File

@ -21,6 +21,7 @@ const (
)
// DatabaseIndex is the in memory index of a collection of measurements, time series, and their tags.
// Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks
type DatabaseIndex struct {
// in memory metadata index, built on load and updated when new series come in
mu sync.RWMutex
@ -63,7 +64,7 @@ func (s *DatabaseIndex) createSeriesIndexIfNotExists(measurementName string, ser
series.measurement = m
s.series[series.Key] = series
m.addSeries(series)
m.AddSeries(series)
return series
}
@ -255,13 +256,12 @@ func (db *DatabaseIndex) DropSeries(keys []string) {
}
// Measurement represents a collection of time series in a database. It also contains in memory
// structures for indexing tags. These structures are accessed through private methods on the Measurement
// object. Generally these methods are only accessed from Index, which is responsible for ensuring
// go routine safe access.
// structures for indexing tags. Exported functions are goroutine safe while un-exported functions
// assume the caller will use the appropriate locks
type Measurement struct {
mu sync.RWMutex
Name string `json:"name,omitempty"`
FieldNames map[string]struct{} `json:"fieldNames,omitempty"`
Name string `json:"name,omitempty"`
fieldNames map[string]struct{}
index *DatabaseIndex
// in-memory index fields
@ -276,7 +276,7 @@ type Measurement struct {
func NewMeasurement(name string, idx *DatabaseIndex) *Measurement {
return &Measurement{
Name: name,
FieldNames: make(map[string]struct{}),
fieldNames: make(map[string]struct{}),
index: idx,
series: make(map[string]*Series),
@ -286,8 +286,16 @@ func NewMeasurement(name string, idx *DatabaseIndex) *Measurement {
}
}
// seriesKeys returns the keys of every series in this measurement
func (m *Measurement) seriesKeys() []string {
// HasField returns true if the measurement has a field by the given name
func (m *Measurement) HasField(name string) bool {
m.mu.RLock()
defer m.mu.RUnlock()
_, hasField := m.fieldNames[name]
return hasField
}
// SeriesKeys returns the keys of every series in this measurement
func (m *Measurement) SeriesKeys() []string {
m.mu.RLock()
defer m.mu.RUnlock()
var keys []string
@ -299,11 +307,24 @@ func (m *Measurement) seriesKeys() []string {
// HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key
func (m *Measurement) HasTagKey(k string) bool {
return m.seriesByTagKeyValue[k] != nil
m.mu.RLock()
defer m.mu.RUnlock()
_, hasTag := m.seriesByTagKeyValue[k]
return hasTag
}
// addSeries will add a series to the measurementIndex. Returns false if already present
func (m *Measurement) addSeries(s *Series) bool {
// HasSeries returns true if there is at least 1 series under this measurement
func (m *Measurement) HasSeries() bool {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.seriesByID) > 0
}
// AddSeries will add a series to the measurementIndex. Returns false if already present
func (m *Measurement) AddSeries(s *Series) bool {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.seriesByID[s.id]; ok {
return false
}
@ -339,7 +360,7 @@ func (m *Measurement) addSeries(s *Series) bool {
return true
}
// dropSeries will remove a series from the measurementIndex. Returns true if already removed
// DropSeries will remove a series from the measurementIndex. Returns true if already removed
func (m *Measurement) DropSeries(seriesID uint64) bool {
m.mu.Lock()
defer m.mu.Unlock()
@ -390,11 +411,6 @@ func (m *Measurement) DropSeries(seriesID uint64) bool {
return true
}
// seriesByTags returns the Series that matches the given tagset.
func (m *Measurement) seriesByTags(tags map[string]string) *Series {
return m.series[string(marshalTags(tags))]
}
// filters walks the where clause of a select statement and returns a map with all series ids
// matching the where clause and any filter expression that should be applied to each
func (m *Measurement) filters(stmt *influxql.SelectStatement) (map[uint64]influxql.Expr, error) {
@ -434,6 +450,8 @@ func (m *Measurement) filters(stmt *influxql.SelectStatement) (map[uint64]influx
func (m *Measurement) TagSets(stmt *influxql.SelectStatement, dimensions []string) ([]*influxql.TagSet, error) {
m.index.mu.RLock()
defer m.index.mu.RUnlock()
m.mu.RLock()
defer m.mu.RUnlock()
// get the unique set of series ids and the filters that should be applied to each
filters, err := m.filters(stmt)
@ -568,7 +586,7 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (seriesIDs, influxql.Ex
// For fields, return all series IDs from this measurement and return
// the expression passed in, as the filter.
if _, ok := m.FieldNames[name.Val]; ok {
if m.HasField(name.Val) {
return m.seriesIDs, n, nil
}
@ -1104,8 +1122,10 @@ func timeBetweenInclusive(t, min, max time.Time) bool {
return (t.Equal(min) || t.After(min)) && (t.Equal(max) || t.Before(max))
}
// tagKeys returns a list of the measurement's tag names.
func (m *Measurement) tagKeys() []string {
// TagKeys returns a list of the measurement's tag names.
func (m *Measurement) TagKeys() []string {
m.mu.RLock()
defer m.mu.RUnlock()
keys := make([]string, 0, len(m.seriesByTagKeyValue))
for k := range m.seriesByTagKeyValue {
keys = append(keys, k)
@ -1114,6 +1134,17 @@ func (m *Measurement) tagKeys() []string {
return keys
}
// FieldNames returns a list of the measurement's field names
func (m *Measurement) FieldNames() (a []string) {
m.mu.RLock()
defer m.mu.RUnlock()
for n, _ := range m.fieldNames {
a = append(a, n)
}
return
}
func (m *Measurement) tagValuesByKeyAndSeriesID(tagKeys []string, ids seriesIDs) map[string]stringSet {
// If no tag keys were passed, get all tag keys for the measurement.
if len(tagKeys) == 0 {

View File

@ -303,7 +303,7 @@ func (q *QueryExecutor) expandWildcards(stmt *influxql.SelectStatement) (*influx
}
// Get the fields for this measurement.
for name, _ := range mm.FieldNames {
for _, name := range mm.FieldNames() {
if _, ok := fieldSet[name]; ok {
continue
}
@ -312,7 +312,7 @@ func (q *QueryExecutor) expandWildcards(stmt *influxql.SelectStatement) (*influx
}
// Get the dimensions for this measurement.
for _, t := range mm.tagKeys() {
for _, t := range mm.TagKeys() {
if _, ok := dimensionSet[t]; ok {
continue
}
@ -407,7 +407,7 @@ func (q *QueryExecutor) executeDropMeasurementStatement(stmt *influxql.DropMeasu
db.DropMeasurement(m.Name)
// now drop the raw data
if err := q.store.deleteMeasurement(m.Name, m.seriesKeys()); err != nil {
if err := q.store.deleteMeasurement(m.Name, m.SeriesKeys()); err != nil {
return &influxql.Result{Err: err}
}
@ -511,7 +511,7 @@ func (q *QueryExecutor) executeShowSeriesStatement(stmt *influxql.ShowSeriesStat
// Make a new row for this measurement.
r := &influxql.Row{
Name: m.Name,
Columns: m.tagKeys(),
Columns: m.TagKeys(),
}
// Loop through series IDs getting matching tag sets.
@ -661,7 +661,7 @@ func (q *QueryExecutor) executeShowTagKeysStatement(stmt *influxql.ShowTagKeysSt
// TODO: filter tag keys by stmt.Condition
// Get the tag keys in sorted order.
keys := m.tagKeys()
keys := m.TagKeys()
// Convert keys to an [][]interface{}.
values := make([][]interface{}, 0, len(m.seriesByTagKeyValue))
@ -795,10 +795,7 @@ func (q *QueryExecutor) executeShowFieldKeysStatement(stmt *influxql.ShowFieldKe
}
// Get a list of field names from the measurement then sort them.
names := make([]string, 0, len(m.FieldNames))
for n, _ := range m.FieldNames {
names = append(names, n)
}
names := m.FieldNames()
sort.Strings(names)
// Add the field names to the result row values.
@ -835,7 +832,7 @@ func measurementsFromSourcesOrDB(db *DatabaseIndex, sources ...influxql.Source)
} else {
// No measurements specified in FROM clause so get all measurements that have series.
for _, m := range db.Measurements() {
if len(m.seriesIDs) > 0 {
if m.HasSeries() {
measurements = append(measurements, m)
}
}

View File

@ -71,6 +71,9 @@ func (s *Shard) Open() error {
// close shuts down the shard's store.
func (s *Shard) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.db != nil {
_ = s.db.Close()
}
@ -78,7 +81,7 @@ func (s *Shard) Close() error {
}
// TODO: this is temporarily exported to make tx.go work. When the query engine gets refactored
// into the tsdb package this should be removed. No one outside tsdb should know the underlying store
// into the tsdb package this should be removed. No one outside tsdb should know the underlying store.
func (s *Shard) DB() *bolt.DB {
return s.db
}
@ -321,7 +324,7 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*fieldCreate) (map[
// ensure the measurement is in the index and the field is there
measurement := s.index.createMeasurementIndexIfNotExists(f.measurement)
measurement.FieldNames[f.field.Name] = struct{}{}
measurement.fieldNames[f.field.Name] = struct{}{}
}
return measurementsToSave, nil
@ -395,7 +398,7 @@ func (s *Shard) loadMetadataIndex() error {
return err
}
for name, _ := range mf.Fields {
m.FieldNames[name] = struct{}{}
m.fieldNames[name] = struct{}{}
}
mf.codec = newFieldCodec(mf.Fields)
s.measurementFields[string(k)] = mf

View File

@ -48,7 +48,7 @@ func TestShardWriteAndIndex(t *testing.T) {
if len(seriesTags) != len(pt.Tags()) || pt.Tags()["host"] != seriesTags["host"] {
t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), index.series[string(pt.Key())].Tags)
}
if !reflect.DeepEqual(index.measurements["cpu"].tagKeys(), []string{"host"}) {
if !reflect.DeepEqual(index.measurements["cpu"].TagKeys(), []string{"host"}) {
t.Fatalf("tag key wasn't saved to measurement index")
}
}

View File

@ -118,7 +118,7 @@ func (s *Store) Measurement(database, name string) *Measurement {
if db == nil {
return nil
}
return db.measurements[name]
return db.Measurement(name)
}
// deleteSeries lopos through the local shards and deletes the series data and metadata for the passed in series keys

View File

@ -73,8 +73,7 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
var selectTags []string
for _, n := range stmt.NamesInSelect() {
_, hasField := m.FieldNames[n]
if hasField {
if m.HasField(n) {
selectFields = append(selectFields, n)
continue
}
@ -88,8 +87,7 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
if n == "time" {
continue
}
_, hasField := m.FieldNames[n]
if hasField {
if m.HasField(n) {
whereFields = append(whereFields, n)
continue
}