From 408bc3f81e33992bba30d4cb612aacaf1c6fbca5 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Thu, 4 Jun 2015 14:50:32 -0400 Subject: [PATCH 1/2] Ensure proper locking of index structures on writes and queries. --- tsdb/meta.go | 60 +++++++++++++++++++++++++++++++++--------- tsdb/query_executor.go | 17 +++++------- tsdb/shard.go | 9 ++++--- tsdb/shard_test.go | 2 +- tsdb/store.go | 2 +- tsdb/tx.go | 6 ++--- 6 files changed, 65 insertions(+), 31 deletions(-) diff --git a/tsdb/meta.go b/tsdb/meta.go index 42bbbe9098..ca03ce35e3 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -21,6 +21,7 @@ const ( ) // DatabaseIndex is the in memory index of a collection of measurements, time series, and their tags. +// Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks type DatabaseIndex struct { // in memory metadata index, built on load and updated when new series come in mu sync.RWMutex @@ -63,7 +64,7 @@ func (s *DatabaseIndex) createSeriesIndexIfNotExists(measurementName string, ser series.measurement = m s.series[series.Key] = series - m.addSeries(series) + m.AddSeries(series) return series } @@ -255,13 +256,12 @@ func (db *DatabaseIndex) DropSeries(keys []string) { } // Measurement represents a collection of time series in a database. It also contains in memory -// structures for indexing tags. These structures are accessed through private methods on the Measurement -// object. Generally these methods are only accessed from Index, which is responsible for ensuring -// go routine safe access. +// structures for indexing tags. Exported functions are goroutine safe while un-exported functions +// assume the caller will use the appropriate locks type Measurement struct { mu sync.RWMutex Name string `json:"name,omitempty"` - FieldNames map[string]struct{} `json:"fieldNames,omitempty"` + fieldNames map[string]struct{} `json:"fieldNames,omitempty"` index *DatabaseIndex // in-memory index fields @@ -276,7 +276,7 @@ type Measurement struct { func NewMeasurement(name string, idx *DatabaseIndex) *Measurement { return &Measurement{ Name: name, - FieldNames: make(map[string]struct{}), + fieldNames: make(map[string]struct{}), index: idx, series: make(map[string]*Series), @@ -286,8 +286,16 @@ func NewMeasurement(name string, idx *DatabaseIndex) *Measurement { } } +// hasField returns true if the measurement has a field by the given name +func (m *Measurement) HasField(name string) bool { + m.mu.RLock() + defer m.mu.RUnlock() + _, hasField := m.fieldNames[name] + return hasField +} + // seriesKeys returns the keys of every series in this measurement -func (m *Measurement) seriesKeys() []string { +func (m *Measurement) SeriesKeys() []string { m.mu.RLock() defer m.mu.RUnlock() var keys []string @@ -299,11 +307,24 @@ func (m *Measurement) seriesKeys() []string { // HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key func (m *Measurement) HasTagKey(k string) bool { - return m.seriesByTagKeyValue[k] != nil + m.mu.RLock() + defer m.mu.RUnlock() + _, hasTag := m.seriesByTagKeyValue[k] + return hasTag +} + +// HasSeries returns true if there is at least 1 series under this measurement +func (m *Measurement) HasSeries() bool { + m.mu.RLock() + defer m.mu.RUnlock() + return len(m.seriesByID) > 0 } // addSeries will add a series to the measurementIndex. Returns false if already present -func (m *Measurement) addSeries(s *Series) bool { +func (m *Measurement) AddSeries(s *Series) bool { + m.mu.Lock() + defer m.mu.Unlock() + if _, ok := m.seriesByID[s.id]; ok { return false } @@ -434,6 +455,8 @@ func (m *Measurement) filters(stmt *influxql.SelectStatement) (map[uint64]influx func (m *Measurement) TagSets(stmt *influxql.SelectStatement, dimensions []string) ([]*influxql.TagSet, error) { m.index.mu.RLock() defer m.index.mu.RUnlock() + m.mu.RLock() + defer m.mu.RUnlock() // get the unique set of series ids and the filters that should be applied to each filters, err := m.filters(stmt) @@ -568,7 +591,7 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (seriesIDs, influxql.Ex // For fields, return all series IDs from this measurement and return // the expression passed in, as the filter. - if _, ok := m.FieldNames[name.Val]; ok { + if m.HasField(name.Val) { return m.seriesIDs, n, nil } @@ -1104,8 +1127,10 @@ func timeBetweenInclusive(t, min, max time.Time) bool { return (t.Equal(min) || t.After(min)) && (t.Equal(max) || t.Before(max)) } -// tagKeys returns a list of the measurement's tag names. -func (m *Measurement) tagKeys() []string { +// TagKeys returns a list of the measurement's tag names. +func (m *Measurement) TagKeys() []string { + m.mu.RLock() + defer m.mu.RUnlock() keys := make([]string, 0, len(m.seriesByTagKeyValue)) for k := range m.seriesByTagKeyValue { keys = append(keys, k) @@ -1114,6 +1139,17 @@ func (m *Measurement) tagKeys() []string { return keys } +// FieldNames returns a list of the measurement's field names +func (m *Measurement) FieldNames() (a []string) { + m.mu.RLock() + defer m.mu.RUnlock() + + for n, _ := range m.fieldNames { + a = append(a, n) + } + return +} + func (m *Measurement) tagValuesByKeyAndSeriesID(tagKeys []string, ids seriesIDs) map[string]stringSet { // If no tag keys were passed, get all tag keys for the measurement. if len(tagKeys) == 0 { diff --git a/tsdb/query_executor.go b/tsdb/query_executor.go index 8c35d47047..71f0e8afa8 100644 --- a/tsdb/query_executor.go +++ b/tsdb/query_executor.go @@ -303,7 +303,7 @@ func (q *QueryExecutor) expandWildcards(stmt *influxql.SelectStatement) (*influx } // Get the fields for this measurement. - for name, _ := range mm.FieldNames { + for _, name := range mm.FieldNames() { if _, ok := fieldSet[name]; ok { continue } @@ -312,7 +312,7 @@ func (q *QueryExecutor) expandWildcards(stmt *influxql.SelectStatement) (*influx } // Get the dimensions for this measurement. - for _, t := range mm.tagKeys() { + for _, t := range mm.TagKeys() { if _, ok := dimensionSet[t]; ok { continue } @@ -407,7 +407,7 @@ func (q *QueryExecutor) executeDropMeasurementStatement(stmt *influxql.DropMeasu db.DropMeasurement(m.Name) // now drop the raw data - if err := q.store.deleteMeasurement(m.Name, m.seriesKeys()); err != nil { + if err := q.store.deleteMeasurement(m.Name, m.SeriesKeys()); err != nil { return &influxql.Result{Err: err} } @@ -511,7 +511,7 @@ func (q *QueryExecutor) executeShowSeriesStatement(stmt *influxql.ShowSeriesStat // Make a new row for this measurement. r := &influxql.Row{ Name: m.Name, - Columns: m.tagKeys(), + Columns: m.TagKeys(), } // Loop through series IDs getting matching tag sets. @@ -661,7 +661,7 @@ func (q *QueryExecutor) executeShowTagKeysStatement(stmt *influxql.ShowTagKeysSt // TODO: filter tag keys by stmt.Condition // Get the tag keys in sorted order. - keys := m.tagKeys() + keys := m.TagKeys() // Convert keys to an [][]interface{}. values := make([][]interface{}, 0, len(m.seriesByTagKeyValue)) @@ -795,10 +795,7 @@ func (q *QueryExecutor) executeShowFieldKeysStatement(stmt *influxql.ShowFieldKe } // Get a list of field names from the measurement then sort them. - names := make([]string, 0, len(m.FieldNames)) - for n, _ := range m.FieldNames { - names = append(names, n) - } + names := m.FieldNames() sort.Strings(names) // Add the field names to the result row values. @@ -835,7 +832,7 @@ func measurementsFromSourcesOrDB(db *DatabaseIndex, sources ...influxql.Source) } else { // No measurements specified in FROM clause so get all measurements that have series. for _, m := range db.Measurements() { - if len(m.seriesIDs) > 0 { + if m.HasSeries() { measurements = append(measurements, m) } } diff --git a/tsdb/shard.go b/tsdb/shard.go index c9e05b06c7..2d99ba6032 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -71,6 +71,9 @@ func (s *Shard) Open() error { // close shuts down the shard's store. func (s *Shard) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + if s.db != nil { _ = s.db.Close() } @@ -78,7 +81,7 @@ func (s *Shard) Close() error { } // TODO: this is temporarily exported to make tx.go work. When the query engine gets refactored -// into the tsdb package this should be removed. No one outside tsdb should know the underlying store +// into the tsdb package this should be removed. No one outside tsdb should know the underlying store. func (s *Shard) DB() *bolt.DB { return s.db } @@ -321,7 +324,7 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*fieldCreate) (map[ // ensure the measurement is in the index and the field is there measurement := s.index.createMeasurementIndexIfNotExists(f.measurement) - measurement.FieldNames[f.field.Name] = struct{}{} + measurement.fieldNames[f.field.Name] = struct{}{} } return measurementsToSave, nil @@ -395,7 +398,7 @@ func (s *Shard) loadMetadataIndex() error { return err } for name, _ := range mf.Fields { - m.FieldNames[name] = struct{}{} + m.fieldNames[name] = struct{}{} } mf.codec = newFieldCodec(mf.Fields) s.measurementFields[string(k)] = mf diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index f4283e377d..35a99f0737 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -48,7 +48,7 @@ func TestShardWriteAndIndex(t *testing.T) { if len(seriesTags) != len(pt.Tags()) || pt.Tags()["host"] != seriesTags["host"] { t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), index.series[string(pt.Key())].Tags) } - if !reflect.DeepEqual(index.measurements["cpu"].tagKeys(), []string{"host"}) { + if !reflect.DeepEqual(index.measurements["cpu"].TagKeys(), []string{"host"}) { t.Fatalf("tag key wasn't saved to measurement index") } } diff --git a/tsdb/store.go b/tsdb/store.go index 2081dcce09..cfd2dbfc43 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -94,7 +94,7 @@ func (s *Store) Measurement(database, name string) *Measurement { if db == nil { return nil } - return db.measurements[name] + return db.Measurement(name) } // deleteSeries lopos through the local shards and deletes the series data and metadata for the passed in series keys diff --git a/tsdb/tx.go b/tsdb/tx.go index 1de299c735..2ec5afe428 100644 --- a/tsdb/tx.go +++ b/tsdb/tx.go @@ -73,8 +73,7 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri var selectTags []string for _, n := range stmt.NamesInSelect() { - _, hasField := m.FieldNames[n] - if hasField { + if m.HasField(n) { selectFields = append(selectFields, n) continue } @@ -88,8 +87,7 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri if n == "time" { continue } - _, hasField := m.FieldNames[n] - if hasField { + if m.HasField(n) { whereFields = append(whereFields, n) continue } From 9bf09ee026fb8de01dcbbd723d998373c4f05f90 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Thu, 4 Jun 2015 16:08:12 -0400 Subject: [PATCH 2/2] Correct comments in tsdb/meta --- tsdb/meta.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/tsdb/meta.go b/tsdb/meta.go index ca03ce35e3..9bb2c9539e 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -260,8 +260,8 @@ func (db *DatabaseIndex) DropSeries(keys []string) { // assume the caller will use the appropriate locks type Measurement struct { mu sync.RWMutex - Name string `json:"name,omitempty"` - fieldNames map[string]struct{} `json:"fieldNames,omitempty"` + Name string `json:"name,omitempty"` + fieldNames map[string]struct{} index *DatabaseIndex // in-memory index fields @@ -286,7 +286,7 @@ func NewMeasurement(name string, idx *DatabaseIndex) *Measurement { } } -// hasField returns true if the measurement has a field by the given name +// HasField returns true if the measurement has a field by the given name func (m *Measurement) HasField(name string) bool { m.mu.RLock() defer m.mu.RUnlock() @@ -294,7 +294,7 @@ func (m *Measurement) HasField(name string) bool { return hasField } -// seriesKeys returns the keys of every series in this measurement +// SeriesKeys returns the keys of every series in this measurement func (m *Measurement) SeriesKeys() []string { m.mu.RLock() defer m.mu.RUnlock() @@ -320,7 +320,7 @@ func (m *Measurement) HasSeries() bool { return len(m.seriesByID) > 0 } -// addSeries will add a series to the measurementIndex. Returns false if already present +// AddSeries will add a series to the measurementIndex. Returns false if already present func (m *Measurement) AddSeries(s *Series) bool { m.mu.Lock() defer m.mu.Unlock() @@ -360,7 +360,7 @@ func (m *Measurement) AddSeries(s *Series) bool { return true } -// dropSeries will remove a series from the measurementIndex. Returns true if already removed +// DropSeries will remove a series from the measurementIndex. Returns true if already removed func (m *Measurement) DropSeries(seriesID uint64) bool { m.mu.Lock() defer m.mu.Unlock() @@ -411,11 +411,6 @@ func (m *Measurement) DropSeries(seriesID uint64) bool { return true } -// seriesByTags returns the Series that matches the given tagset. -func (m *Measurement) seriesByTags(tags map[string]string) *Series { - return m.series[string(marshalTags(tags))] -} - // filters walks the where clause of a select statement and returns a map with all series ids // matching the where clause and any filter expression that should be applied to each func (m *Measurement) filters(stmt *influxql.SelectStatement) (map[uint64]influxql.Expr, error) {