diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index 5f2b1fdfeb..127a0db4fc 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -6,6 +6,7 @@ import ( "sort" "time" + "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/tsdb" ) @@ -100,6 +101,26 @@ func (a Values) Encode(buf []byte) ([]byte, error) { return nil, fmt.Errorf("unsupported value type %T", a[0]) } +// InfluxQLType returns the influxql.DataType the values map to. +func (a Values) InfluxQLType() (influxql.DataType, error) { + if len(a) == 0 { + return influxql.Unknown, fmt.Errorf("no values in collection") + } + + switch a[0].Value().(type) { + case float64: + return influxql.Float, nil + case int64: + return influxql.Integer, nil + case bool: + return influxql.Boolean, nil + case string: + return influxql.String, nil + } + + return influxql.Unknown, fmt.Errorf("unsupported value type %T", a[0]) +} + // BlockType returns the type of value encoded in a block or an error // if the block type is unknown. func BlockType(block []byte) (byte, error) { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 6027ba543a..2232ed6563 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -39,6 +39,12 @@ type DevEngine struct { MaxPointsPerBlock int CacheFlushMemorySizeThreshold uint64 + + // walSeries is a temporary holder on startup for series that appear in the WAL + walSeries map[string]string + + // walFields is a temporary holder on startup for measurement fields defined in the WAL + walFields map[string]map[string]influxql.DataType } // NewDevEngine returns a new instance of Engine. @@ -127,7 +133,7 @@ func (e *DevEngine) Close() error { func (e *DevEngine) SetLogOutput(w io.Writer) {} // LoadMetadataIndex loads the shard metadata into memory. -func (e *DevEngine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error { +func (e *DevEngine) LoadMetadataIndex(_ *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error { keys := e.FileStore.Keys() for _, k := range keys { seriesKey, field := seriesAndFieldFromCompositeKey(k) @@ -149,25 +155,13 @@ func (e *DevEngine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseInd measurementFields[measurement] = mf } - switch typ { - case BlockFloat64: - if err := mf.CreateFieldIfNotExists(field, influxql.Float, false); err != nil { - return err - } - case BlockInt64: - if err := mf.CreateFieldIfNotExists(field, influxql.Integer, false); err != nil { - return err - } - case BlockBool: - if err := mf.CreateFieldIfNotExists(field, influxql.Boolean, false); err != nil { - return err - } - case BlockString: - if err := mf.CreateFieldIfNotExists(field, influxql.String, false); err != nil { - return err - } - default: - return fmt.Errorf("unkown block type for: %v. got %v", k, typ) + fieldType, err := tsmFieldTypeToInfluxQLDataType(typ) + if err != nil { + return err + } + + if err := mf.CreateFieldIfNotExists(field, fieldType, false); err != nil { + return err } _, tags, err := models.ParseKey(seriesKey) @@ -179,6 +173,42 @@ func (e *DevEngine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseInd s.InitializeShards() index.CreateSeriesIndexIfNotExists(measurement, s) } + + // load the measurement and field metadata from the WAL + for measurement, fields := range e.walFields { + m := index.CreateMeasurementIndexIfNotExists(measurement) + + mf := measurementFields[measurement] + if mf == nil { + mf = &tsdb.MeasurementFields{ + Fields: map[string]*tsdb.Field{}, + } + measurementFields[measurement] = mf + } + + for field, fieldType := range fields { + m.SetFieldName(field) + if err := mf.CreateFieldIfNotExists(field, fieldType, false); err != nil { + return err + } + } + } + e.walFields = nil + + // load the series from the WAL + for seriesKey, measurement := range e.walSeries { + _, tags, err := models.ParseKey(seriesKey) + if err == nil { + return err + } + + s := tsdb.NewSeries(seriesKey, tags) + s.InitializeShards() + + index.CreateSeriesIndexIfNotExists(measurement, s) + } + e.walSeries = nil + return nil } @@ -228,7 +258,8 @@ func (e *DevEngine) Begin(writable bool) (tsdb.Tx, error) { func (e *DevEngine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") } -func (e *DevEngine) writeSnapshot() error { +// WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done. +func (e *DevEngine) WriteSnapshot(async bool) error { // Lock and grab the cache snapshot along with all the closed WAL // filenames associated with the snapshot closedFiles, snapshot, compactor, err := func() ([]string, *Cache, *Compactor, error) { @@ -253,30 +284,38 @@ func (e *DevEngine) writeSnapshot() error { return err } - go func() { - // write the new snapshot files - newFiles, err := compactor.WriteSnapshot(snapshot) - if err != nil { - e.logger.Printf("error writing snapshot from compactor: %v", err) - return - } + if async { + go e.writeSnapshotAndCommit(closedFiles, snapshot, compactor) + return nil + } - e.mu.RLock() - defer e.mu.RUnlock() + return e.writeSnapshotAndCommit(closedFiles, snapshot, compactor) +} - // update the file store with these new files - if err := e.FileStore.Replace(nil, newFiles); err != nil { - e.logger.Printf("error adding new TSM files from snapshot: %v", err) - return - } +// writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments +func (e *DevEngine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache, compactor *Compactor) error { + // write the new snapshot files + newFiles, err := compactor.WriteSnapshot(snapshot) + if err != nil { + e.logger.Printf("error writing snapshot from compactor: %v", err) + return err + } - // clear the snapshot from the in-memory cache, then the old WAL files - e.Cache.ClearSnapshot(snapshot) + e.mu.RLock() + defer e.mu.RUnlock() - if err := e.WAL.Remove(closedFiles); err != nil { - e.logger.Printf("error removing closed wal segments: %v", err) - } - }() + // update the file store with these new files + if err := e.FileStore.Replace(nil, newFiles); err != nil { + e.logger.Printf("error adding new TSM files from snapshot: %v", err) + return err + } + + // clear the snapshot from the in-memory cache, then the old WAL files + e.Cache.ClearSnapshot(snapshot) + + if err := e.WAL.Remove(closedFiles); err != nil { + e.logger.Printf("error removing closed wal segments: %v", err) + } return nil } @@ -284,7 +323,7 @@ func (e *DevEngine) writeSnapshot() error { func (e *DevEngine) compact() { for { if e.Cache.Size() > e.CacheFlushMemorySizeThreshold { - err := e.writeSnapshot() + err := e.WriteSnapshot(true) if err != nil { e.logger.Printf("error writing snapshot: %v", err) } @@ -318,12 +357,18 @@ func (e *DevEngine) compact() { } } +// reloadCache reads the WAL segment files and loads them into the cache. It also stores +// the measurements, series and fields defined in the WAL so that it can be used in the +// LoadMetadataFromIndex function. func (e *DevEngine) reloadCache() error { files, err := segmentFileNames(e.WAL.Path()) if err != nil { return err } + e.walSeries = make(map[string]string) + e.walFields = make(map[string]map[string]influxql.DataType) + for _, fn := range files { f, err := os.Open(fn) if err != nil { @@ -346,6 +391,7 @@ func (e *DevEngine) reloadCache() error { if err := e.Cache.WriteMulti(t.Values); err != nil { return err } + e.loadWALMetadata(t.Values) case *DeleteWALEntry: // FIXME: Implement this // if err := e.Cache.Delete(t.Keys); err != nil { @@ -357,6 +403,28 @@ func (e *DevEngine) reloadCache() error { return nil } +// loadWALMetadata will put the series key, measurement name, field name and type into the temporary wal maps of metadata +func (e *DevEngine) loadWALMetadata(vals map[string][]Value) { + for k, v := range vals { + seriesKey, field := seriesAndFieldFromCompositeKey(k) + measurement := tsdb.MeasurementFromSeriesKey(seriesKey) + fieldType, err := Values(v).InfluxQLType() + if err != nil { + continue + } + + e.walSeries[seriesKey] = measurement + + m := e.walFields[measurement] + if m == nil { + m = make(map[string]influxql.DataType) + e.walFields[measurement] = m + } + + m[field] = fieldType + } +} + func (e *DevEngine) Read(key string, t time.Time) ([]Value, error) { e.mu.RLock() defer e.mu.RUnlock() @@ -575,3 +643,18 @@ func (c *devCursor) nextTSM() (int64, interface{}) { return c.tsmValues[c.tsmPos].UnixNano(), c.tsmValues[c.tsmPos].Value() } } + +func tsmFieldTypeToInfluxQLDataType(typ byte) (influxql.DataType, error) { + switch typ { + case BlockFloat64: + return influxql.Float, nil + case BlockInt64: + return influxql.Integer, nil + case BlockBool: + return influxql.Boolean, nil + case BlockString: + return influxql.String, nil + default: + return influxql.Unknown, fmt.Errorf("unkown block type: %v", typ) + } +} diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 1c14e9f0f7..e3c88ff5d7 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "os" "path/filepath" + "reflect" "testing" "time" @@ -215,6 +216,104 @@ func TestDevEngine_QueryTSM_Descending(t *testing.T) { } } +func TestDevEngine_LoadMetadataIndex(t *testing.T) { + // Generate temporary file. + f, _ := ioutil.TempFile("", "tsm1dev") + f.Close() + os.Remove(f.Name()) + walPath := filepath.Join(f.Name(), "wal") + os.MkdirAll(walPath, 0777) + defer os.RemoveAll(f.Name()) + + // Create a few points. + p1 := parsePoint("cpu,host=A value=1.1 1000000000") + p2 := parsePoint("cpu,host=B value=1.2 2000000000") + + // Write those points to the engine. + e := NewDevEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*DevEngine) + if err := e.Open(); err != nil { + t.Fatalf("failed to open tsm1dev engine: %s", err.Error()) + } + if err := e.WritePoints([]models.Point{p1}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + // ensure we can close and load index from the WAL + if err := e.Close(); err != nil { + t.Fatalf("error closing: %s", err.Error()) + } + if err := e.Open(); err != nil { + t.Fatalf("error opening: %s", err.Error()) + } + + // Load metadata index. + index := tsdb.NewDatabaseIndex() + if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil { + t.Fatal(err) + } + + // Verify index is correct. + if m := index.Measurement("cpu"); m == nil { + t.Fatal("measurement not found") + } else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "A"}) { + t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags) + } + + // write the snapshot, ensure we can close and load index from TSM + if err := e.WriteSnapshot(false); err != nil { + t.Fatalf("error writing snapshot: %s", err.Error()) + } + + // ensure we can close and load index from the WAL + if err := e.Close(); err != nil { + t.Fatalf("error closing: %s", err.Error()) + } + if err := e.Open(); err != nil { + t.Fatalf("error opening: %s", err.Error()) + } + + // Load metadata index. + index = tsdb.NewDatabaseIndex() + if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil { + t.Fatal(err) + } + + // Verify index is correct. + if m := index.Measurement("cpu"); m == nil { + t.Fatal("measurement not found") + } else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "A"}) { + t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags) + } + + // write a new point and ensure we can close and load index from TSM and WAL + if err := e.WritePoints([]models.Point{p2}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + // ensure we can close and load index from the TSM and WAL + if err := e.Close(); err != nil { + t.Fatalf("error closing: %s", err.Error()) + } + if err := e.Open(); err != nil { + t.Fatalf("error opening: %s", err.Error()) + } + + // Load metadata index. + index = tsdb.NewDatabaseIndex() + if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil { + t.Fatal(err) + } + + // Verify index is correct. + if m := index.Measurement("cpu"); m == nil { + t.Fatal("measurement not found") + } else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "A"}) { + t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags) + } else if s := m.SeriesByID(2); s.Key != "cpu,host=B" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "B"}) { + t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags) + } +} + func parsePoints(buf string) []models.Point { points, err := models.ParsePointsString(buf) if err != nil { diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index acb8e9f82c..a2eaa2c372 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -275,6 +275,7 @@ func (l *WAL) Close() error { if l.currentSegmentWriter != nil { l.currentSegmentWriter.Close() + l.currentSegmentWriter = nil } return nil