From b0f3dcc8cccfc8b977f5543bc34bf689b9a7291f Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Fri, 4 Dec 2015 16:03:17 -0500 Subject: [PATCH] Update TSM metadata loading and write snapshot * Update WriteSnapshot to always call synchronously * Update LoadMetadataIndex to load WAL metadata from the cache --- tsdb/engine/tsm1/encoding.go | 2 +- tsdb/engine/tsm1/engine.go | 140 ++++++++++++-------------------- tsdb/engine/tsm1/engine_test.go | 2 +- 3 files changed, 53 insertions(+), 91 deletions(-) diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index 127a0db4fc..5bc22e4ce7 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -104,7 +104,7 @@ func (a Values) Encode(buf []byte) ([]byte, error) { // InfluxQLType returns the influxql.DataType the values map to. func (a Values) InfluxQLType() (influxql.DataType, error) { if len(a) == 0 { - return influxql.Unknown, fmt.Errorf("no values in collection") + return influxql.Unknown, fmt.Errorf("no values to infer type") } switch a[0].Value().(type) { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 2232ed6563..03f9e38a5e 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -39,12 +39,6 @@ type DevEngine struct { MaxPointsPerBlock int CacheFlushMemorySizeThreshold uint64 - - // walSeries is a temporary holder on startup for series that appear in the WAL - walSeries map[string]string - - // walFields is a temporary holder on startup for measurement fields defined in the WAL - walFields map[string]map[string]influxql.DataType } // NewDevEngine returns a new instance of Engine. @@ -135,79 +129,78 @@ func (e *DevEngine) SetLogOutput(w io.Writer) {} // LoadMetadataIndex loads the shard metadata into memory. func (e *DevEngine) LoadMetadataIndex(_ *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error { keys := e.FileStore.Keys() + + keysLoaded := make(map[string]bool) + for _, k := range keys { - seriesKey, field := seriesAndFieldFromCompositeKey(k) - measurement := tsdb.MeasurementFromSeriesKey(seriesKey) - - m := index.CreateMeasurementIndexIfNotExists(measurement) - m.SetFieldName(field) - typ, err := e.FileStore.Type(k) if err != nil { return err } - - mf := measurementFields[measurement] - if mf == nil { - mf = &tsdb.MeasurementFields{ - Fields: map[string]*tsdb.Field{}, - } - measurementFields[measurement] = mf - } - fieldType, err := tsmFieldTypeToInfluxQLDataType(typ) if err != nil { return err } - if err := mf.CreateFieldIfNotExists(field, fieldType, false); err != nil { + if err := e.addToIndexFromKey(k, fieldType, index, measurementFields); err != nil { return err } - _, tags, err := models.ParseKey(seriesKey) - if err == nil { + keysLoaded[k] = true + } + + // load metadata from the Cache + e.Cache.mu.Lock() // shouldn't need the lock, but just to be safe + defer e.Cache.mu.Unlock() + + for key, entry := range e.Cache.store { + if keysLoaded[key] { + continue + } + + fieldType, err := entry.values.InfluxQLType() + if err != nil { + log.Printf("error getting the data type of values for key %s: %s", key, err.Error()) + continue + } + + if err := e.addToIndexFromKey(key, fieldType, index, measurementFields); err != nil { return err } - - s := tsdb.NewSeries(seriesKey, tags) - s.InitializeShards() - index.CreateSeriesIndexIfNotExists(measurement, s) } - // load the measurement and field metadata from the WAL - for measurement, fields := range e.walFields { - m := index.CreateMeasurementIndexIfNotExists(measurement) + return nil +} - mf := measurementFields[measurement] - if mf == nil { - mf = &tsdb.MeasurementFields{ - Fields: map[string]*tsdb.Field{}, - } - measurementFields[measurement] = mf - } +// addToIndexFromKey will pull the measurement name, series key, and field name from a composite key and add it to the +// database index and measurement fields +func (e *DevEngine) addToIndexFromKey(key string, fieldType influxql.DataType, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error { + seriesKey, field := seriesAndFieldFromCompositeKey(key) + measurement := tsdb.MeasurementFromSeriesKey(seriesKey) - for field, fieldType := range fields { - m.SetFieldName(field) - if err := mf.CreateFieldIfNotExists(field, fieldType, false); err != nil { - return err - } + m := index.CreateMeasurementIndexIfNotExists(measurement) + m.SetFieldName(field) + + mf := measurementFields[measurement] + if mf == nil { + mf = &tsdb.MeasurementFields{ + Fields: map[string]*tsdb.Field{}, } + measurementFields[measurement] = mf } - e.walFields = nil - // load the series from the WAL - for seriesKey, measurement := range e.walSeries { - _, tags, err := models.ParseKey(seriesKey) - if err == nil { - return err - } - - s := tsdb.NewSeries(seriesKey, tags) - s.InitializeShards() - - index.CreateSeriesIndexIfNotExists(measurement, s) + if err := mf.CreateFieldIfNotExists(field, fieldType, false); err != nil { + return err } - e.walSeries = nil + + _, tags, err := models.ParseKey(seriesKey) + if err == nil { + return err + } + + s := tsdb.NewSeries(seriesKey, tags) + s.InitializeShards() + index.CreateSeriesIndexIfNotExists(measurement, s) return nil } @@ -259,7 +252,7 @@ func (e *DevEngine) Begin(writable bool) (tsdb.Tx, error) { func (e *DevEngine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") } // WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done. -func (e *DevEngine) WriteSnapshot(async bool) error { +func (e *DevEngine) WriteSnapshot() error { // Lock and grab the cache snapshot along with all the closed WAL // filenames associated with the snapshot closedFiles, snapshot, compactor, err := func() ([]string, *Cache, *Compactor, error) { @@ -284,11 +277,6 @@ func (e *DevEngine) WriteSnapshot(async bool) error { return err } - if async { - go e.writeSnapshotAndCommit(closedFiles, snapshot, compactor) - return nil - } - return e.writeSnapshotAndCommit(closedFiles, snapshot, compactor) } @@ -323,7 +311,7 @@ func (e *DevEngine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache func (e *DevEngine) compact() { for { if e.Cache.Size() > e.CacheFlushMemorySizeThreshold { - err := e.WriteSnapshot(true) + err := e.WriteSnapshot() if err != nil { e.logger.Printf("error writing snapshot: %v", err) } @@ -366,9 +354,6 @@ func (e *DevEngine) reloadCache() error { return err } - e.walSeries = make(map[string]string) - e.walFields = make(map[string]map[string]influxql.DataType) - for _, fn := range files { f, err := os.Open(fn) if err != nil { @@ -391,7 +376,6 @@ func (e *DevEngine) reloadCache() error { if err := e.Cache.WriteMulti(t.Values); err != nil { return err } - e.loadWALMetadata(t.Values) case *DeleteWALEntry: // FIXME: Implement this // if err := e.Cache.Delete(t.Keys); err != nil { @@ -403,28 +387,6 @@ func (e *DevEngine) reloadCache() error { return nil } -// loadWALMetadata will put the series key, measurement name, field name and type into the temporary wal maps of metadata -func (e *DevEngine) loadWALMetadata(vals map[string][]Value) { - for k, v := range vals { - seriesKey, field := seriesAndFieldFromCompositeKey(k) - measurement := tsdb.MeasurementFromSeriesKey(seriesKey) - fieldType, err := Values(v).InfluxQLType() - if err != nil { - continue - } - - e.walSeries[seriesKey] = measurement - - m := e.walFields[measurement] - if m == nil { - m = make(map[string]influxql.DataType) - e.walFields[measurement] = m - } - - m[field] = fieldType - } -} - func (e *DevEngine) Read(key string, t time.Time) ([]Value, error) { e.mu.RLock() defer e.mu.RUnlock() diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index e3c88ff5d7..fba6059da7 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -260,7 +260,7 @@ func TestDevEngine_LoadMetadataIndex(t *testing.T) { } // write the snapshot, ensure we can close and load index from TSM - if err := e.WriteSnapshot(false); err != nil { + if err := e.WriteSnapshot(); err != nil { t.Fatalf("error writing snapshot: %s", err.Error()) }