Update TSM engine, WAL and encoding

* Add InfluxQLType to Values to map the TSM type to InfluxQL
* Fix bug in WAL where close wouldn't nil out the currentSegment after closing it
* Export writeSnapshot to be used in tests, add argument to run it async or not
* Update reloadCache to load temporary metadata information in the engine
* Update LoadMetadataIndex to use the temp WAL metadata information
pull/4990/head
Paul Dix 2015-12-04 11:09:39 -05:00
parent b7bae53405
commit eafb703afc
4 changed files with 246 additions and 42 deletions

View File

@ -6,6 +6,7 @@ import (
"sort"
"time"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/tsdb"
)
@ -100,6 +101,26 @@ func (a Values) Encode(buf []byte) ([]byte, error) {
return nil, fmt.Errorf("unsupported value type %T", a[0])
}
// InfluxQLType returns the influxql.DataType the values map to.
func (a Values) InfluxQLType() (influxql.DataType, error) {
if len(a) == 0 {
return influxql.Unknown, fmt.Errorf("no values in collection")
}
switch a[0].Value().(type) {
case float64:
return influxql.Float, nil
case int64:
return influxql.Integer, nil
case bool:
return influxql.Boolean, nil
case string:
return influxql.String, nil
}
return influxql.Unknown, fmt.Errorf("unsupported value type %T", a[0])
}
// BlockType returns the type of value encoded in a block or an error
// if the block type is unknown.
func BlockType(block []byte) (byte, error) {

View File

@ -39,6 +39,12 @@ type DevEngine struct {
MaxPointsPerBlock int
CacheFlushMemorySizeThreshold uint64
// walSeries is a temporary holder on startup for series that appear in the WAL
walSeries map[string]string
// walFields is a temporary holder on startup for measurement fields defined in the WAL
walFields map[string]map[string]influxql.DataType
}
// NewDevEngine returns a new instance of Engine.
@ -127,7 +133,7 @@ func (e *DevEngine) Close() error {
func (e *DevEngine) SetLogOutput(w io.Writer) {}
// LoadMetadataIndex loads the shard metadata into memory.
func (e *DevEngine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
func (e *DevEngine) LoadMetadataIndex(_ *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
keys := e.FileStore.Keys()
for _, k := range keys {
seriesKey, field := seriesAndFieldFromCompositeKey(k)
@ -149,25 +155,13 @@ func (e *DevEngine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseInd
measurementFields[measurement] = mf
}
switch typ {
case BlockFloat64:
if err := mf.CreateFieldIfNotExists(field, influxql.Float, false); err != nil {
return err
}
case BlockInt64:
if err := mf.CreateFieldIfNotExists(field, influxql.Integer, false); err != nil {
return err
}
case BlockBool:
if err := mf.CreateFieldIfNotExists(field, influxql.Boolean, false); err != nil {
return err
}
case BlockString:
if err := mf.CreateFieldIfNotExists(field, influxql.String, false); err != nil {
return err
}
default:
return fmt.Errorf("unkown block type for: %v. got %v", k, typ)
fieldType, err := tsmFieldTypeToInfluxQLDataType(typ)
if err != nil {
return err
}
if err := mf.CreateFieldIfNotExists(field, fieldType, false); err != nil {
return err
}
_, tags, err := models.ParseKey(seriesKey)
@ -179,6 +173,42 @@ func (e *DevEngine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseInd
s.InitializeShards()
index.CreateSeriesIndexIfNotExists(measurement, s)
}
// load the measurement and field metadata from the WAL
for measurement, fields := range e.walFields {
m := index.CreateMeasurementIndexIfNotExists(measurement)
mf := measurementFields[measurement]
if mf == nil {
mf = &tsdb.MeasurementFields{
Fields: map[string]*tsdb.Field{},
}
measurementFields[measurement] = mf
}
for field, fieldType := range fields {
m.SetFieldName(field)
if err := mf.CreateFieldIfNotExists(field, fieldType, false); err != nil {
return err
}
}
}
e.walFields = nil
// load the series from the WAL
for seriesKey, measurement := range e.walSeries {
_, tags, err := models.ParseKey(seriesKey)
if err == nil {
return err
}
s := tsdb.NewSeries(seriesKey, tags)
s.InitializeShards()
index.CreateSeriesIndexIfNotExists(measurement, s)
}
e.walSeries = nil
return nil
}
@ -228,7 +258,8 @@ func (e *DevEngine) Begin(writable bool) (tsdb.Tx, error) {
func (e *DevEngine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") }
func (e *DevEngine) writeSnapshot() error {
// WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done.
func (e *DevEngine) WriteSnapshot(async bool) error {
// Lock and grab the cache snapshot along with all the closed WAL
// filenames associated with the snapshot
closedFiles, snapshot, compactor, err := func() ([]string, *Cache, *Compactor, error) {
@ -253,30 +284,38 @@ func (e *DevEngine) writeSnapshot() error {
return err
}
go func() {
// write the new snapshot files
newFiles, err := compactor.WriteSnapshot(snapshot)
if err != nil {
e.logger.Printf("error writing snapshot from compactor: %v", err)
return
}
if async {
go e.writeSnapshotAndCommit(closedFiles, snapshot, compactor)
return nil
}
e.mu.RLock()
defer e.mu.RUnlock()
return e.writeSnapshotAndCommit(closedFiles, snapshot, compactor)
}
// update the file store with these new files
if err := e.FileStore.Replace(nil, newFiles); err != nil {
e.logger.Printf("error adding new TSM files from snapshot: %v", err)
return
}
// writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments
func (e *DevEngine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache, compactor *Compactor) error {
// write the new snapshot files
newFiles, err := compactor.WriteSnapshot(snapshot)
if err != nil {
e.logger.Printf("error writing snapshot from compactor: %v", err)
return err
}
// clear the snapshot from the in-memory cache, then the old WAL files
e.Cache.ClearSnapshot(snapshot)
e.mu.RLock()
defer e.mu.RUnlock()
if err := e.WAL.Remove(closedFiles); err != nil {
e.logger.Printf("error removing closed wal segments: %v", err)
}
}()
// update the file store with these new files
if err := e.FileStore.Replace(nil, newFiles); err != nil {
e.logger.Printf("error adding new TSM files from snapshot: %v", err)
return err
}
// clear the snapshot from the in-memory cache, then the old WAL files
e.Cache.ClearSnapshot(snapshot)
if err := e.WAL.Remove(closedFiles); err != nil {
e.logger.Printf("error removing closed wal segments: %v", err)
}
return nil
}
@ -284,7 +323,7 @@ func (e *DevEngine) writeSnapshot() error {
func (e *DevEngine) compact() {
for {
if e.Cache.Size() > e.CacheFlushMemorySizeThreshold {
err := e.writeSnapshot()
err := e.WriteSnapshot(true)
if err != nil {
e.logger.Printf("error writing snapshot: %v", err)
}
@ -318,12 +357,18 @@ func (e *DevEngine) compact() {
}
}
// reloadCache reads the WAL segment files and loads them into the cache. It also stores
// the measurements, series and fields defined in the WAL so that it can be used in the
// LoadMetadataFromIndex function.
func (e *DevEngine) reloadCache() error {
files, err := segmentFileNames(e.WAL.Path())
if err != nil {
return err
}
e.walSeries = make(map[string]string)
e.walFields = make(map[string]map[string]influxql.DataType)
for _, fn := range files {
f, err := os.Open(fn)
if err != nil {
@ -346,6 +391,7 @@ func (e *DevEngine) reloadCache() error {
if err := e.Cache.WriteMulti(t.Values); err != nil {
return err
}
e.loadWALMetadata(t.Values)
case *DeleteWALEntry:
// FIXME: Implement this
// if err := e.Cache.Delete(t.Keys); err != nil {
@ -357,6 +403,28 @@ func (e *DevEngine) reloadCache() error {
return nil
}
// loadWALMetadata will put the series key, measurement name, field name and type into the temporary wal maps of metadata
func (e *DevEngine) loadWALMetadata(vals map[string][]Value) {
for k, v := range vals {
seriesKey, field := seriesAndFieldFromCompositeKey(k)
measurement := tsdb.MeasurementFromSeriesKey(seriesKey)
fieldType, err := Values(v).InfluxQLType()
if err != nil {
continue
}
e.walSeries[seriesKey] = measurement
m := e.walFields[measurement]
if m == nil {
m = make(map[string]influxql.DataType)
e.walFields[measurement] = m
}
m[field] = fieldType
}
}
func (e *DevEngine) Read(key string, t time.Time) ([]Value, error) {
e.mu.RLock()
defer e.mu.RUnlock()
@ -575,3 +643,18 @@ func (c *devCursor) nextTSM() (int64, interface{}) {
return c.tsmValues[c.tsmPos].UnixNano(), c.tsmValues[c.tsmPos].Value()
}
}
func tsmFieldTypeToInfluxQLDataType(typ byte) (influxql.DataType, error) {
switch typ {
case BlockFloat64:
return influxql.Float, nil
case BlockInt64:
return influxql.Integer, nil
case BlockBool:
return influxql.Boolean, nil
case BlockString:
return influxql.String, nil
default:
return influxql.Unknown, fmt.Errorf("unkown block type: %v", typ)
}
}

View File

@ -6,6 +6,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"reflect"
"testing"
"time"
@ -215,6 +216,104 @@ func TestDevEngine_QueryTSM_Descending(t *testing.T) {
}
}
func TestDevEngine_LoadMetadataIndex(t *testing.T) {
// Generate temporary file.
f, _ := ioutil.TempFile("", "tsm1dev")
f.Close()
os.Remove(f.Name())
walPath := filepath.Join(f.Name(), "wal")
os.MkdirAll(walPath, 0777)
defer os.RemoveAll(f.Name())
// Create a few points.
p1 := parsePoint("cpu,host=A value=1.1 1000000000")
p2 := parsePoint("cpu,host=B value=1.2 2000000000")
// Write those points to the engine.
e := NewDevEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*DevEngine)
if err := e.Open(); err != nil {
t.Fatalf("failed to open tsm1dev engine: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p1}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
// ensure we can close and load index from the WAL
if err := e.Close(); err != nil {
t.Fatalf("error closing: %s", err.Error())
}
if err := e.Open(); err != nil {
t.Fatalf("error opening: %s", err.Error())
}
// Load metadata index.
index := tsdb.NewDatabaseIndex()
if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil {
t.Fatal(err)
}
// Verify index is correct.
if m := index.Measurement("cpu"); m == nil {
t.Fatal("measurement not found")
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "A"}) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
}
// write the snapshot, ensure we can close and load index from TSM
if err := e.WriteSnapshot(false); err != nil {
t.Fatalf("error writing snapshot: %s", err.Error())
}
// ensure we can close and load index from the WAL
if err := e.Close(); err != nil {
t.Fatalf("error closing: %s", err.Error())
}
if err := e.Open(); err != nil {
t.Fatalf("error opening: %s", err.Error())
}
// Load metadata index.
index = tsdb.NewDatabaseIndex()
if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil {
t.Fatal(err)
}
// Verify index is correct.
if m := index.Measurement("cpu"); m == nil {
t.Fatal("measurement not found")
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "A"}) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
}
// write a new point and ensure we can close and load index from TSM and WAL
if err := e.WritePoints([]models.Point{p2}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
// ensure we can close and load index from the TSM and WAL
if err := e.Close(); err != nil {
t.Fatalf("error closing: %s", err.Error())
}
if err := e.Open(); err != nil {
t.Fatalf("error opening: %s", err.Error())
}
// Load metadata index.
index = tsdb.NewDatabaseIndex()
if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil {
t.Fatal(err)
}
// Verify index is correct.
if m := index.Measurement("cpu"); m == nil {
t.Fatal("measurement not found")
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "A"}) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
} else if s := m.SeriesByID(2); s.Key != "cpu,host=B" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "B"}) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
}
}
func parsePoints(buf string) []models.Point {
points, err := models.ParsePointsString(buf)
if err != nil {

View File

@ -275,6 +275,7 @@ func (l *WAL) Close() error {
if l.currentSegmentWriter != nil {
l.currentSegmentWriter.Close()
l.currentSegmentWriter = nil
}
return nil