Update TSM metadata loading and write snapshot

* Update WriteSnapshot to always call synchronously
* Update LoadMetadataIndex to load WAL metadata from the cache
pull/4990/head
Paul Dix 2015-12-04 16:03:17 -05:00
parent eafb703afc
commit b0f3dcc8cc
3 changed files with 53 additions and 91 deletions

View File

@ -104,7 +104,7 @@ func (a Values) Encode(buf []byte) ([]byte, error) {
// InfluxQLType returns the influxql.DataType the values map to.
func (a Values) InfluxQLType() (influxql.DataType, error) {
if len(a) == 0 {
return influxql.Unknown, fmt.Errorf("no values in collection")
return influxql.Unknown, fmt.Errorf("no values to infer type")
}
switch a[0].Value().(type) {

View File

@ -39,12 +39,6 @@ type DevEngine struct {
MaxPointsPerBlock int
CacheFlushMemorySizeThreshold uint64
// walSeries is a temporary holder on startup for series that appear in the WAL
walSeries map[string]string
// walFields is a temporary holder on startup for measurement fields defined in the WAL
walFields map[string]map[string]influxql.DataType
}
// NewDevEngine returns a new instance of Engine.
@ -135,79 +129,78 @@ func (e *DevEngine) SetLogOutput(w io.Writer) {}
// LoadMetadataIndex loads the shard metadata into memory.
func (e *DevEngine) LoadMetadataIndex(_ *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
keys := e.FileStore.Keys()
keysLoaded := make(map[string]bool)
for _, k := range keys {
seriesKey, field := seriesAndFieldFromCompositeKey(k)
measurement := tsdb.MeasurementFromSeriesKey(seriesKey)
m := index.CreateMeasurementIndexIfNotExists(measurement)
m.SetFieldName(field)
typ, err := e.FileStore.Type(k)
if err != nil {
return err
}
mf := measurementFields[measurement]
if mf == nil {
mf = &tsdb.MeasurementFields{
Fields: map[string]*tsdb.Field{},
}
measurementFields[measurement] = mf
}
fieldType, err := tsmFieldTypeToInfluxQLDataType(typ)
if err != nil {
return err
}
if err := mf.CreateFieldIfNotExists(field, fieldType, false); err != nil {
if err := e.addToIndexFromKey(k, fieldType, index, measurementFields); err != nil {
return err
}
_, tags, err := models.ParseKey(seriesKey)
if err == nil {
keysLoaded[k] = true
}
// load metadata from the Cache
e.Cache.mu.Lock() // shouldn't need the lock, but just to be safe
defer e.Cache.mu.Unlock()
for key, entry := range e.Cache.store {
if keysLoaded[key] {
continue
}
fieldType, err := entry.values.InfluxQLType()
if err != nil {
log.Printf("error getting the data type of values for key %s: %s", key, err.Error())
continue
}
if err := e.addToIndexFromKey(key, fieldType, index, measurementFields); err != nil {
return err
}
s := tsdb.NewSeries(seriesKey, tags)
s.InitializeShards()
index.CreateSeriesIndexIfNotExists(measurement, s)
}
// load the measurement and field metadata from the WAL
for measurement, fields := range e.walFields {
m := index.CreateMeasurementIndexIfNotExists(measurement)
return nil
}
mf := measurementFields[measurement]
if mf == nil {
mf = &tsdb.MeasurementFields{
Fields: map[string]*tsdb.Field{},
}
measurementFields[measurement] = mf
}
// addToIndexFromKey will pull the measurement name, series key, and field name from a composite key and add it to the
// database index and measurement fields
func (e *DevEngine) addToIndexFromKey(key string, fieldType influxql.DataType, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
seriesKey, field := seriesAndFieldFromCompositeKey(key)
measurement := tsdb.MeasurementFromSeriesKey(seriesKey)
for field, fieldType := range fields {
m.SetFieldName(field)
if err := mf.CreateFieldIfNotExists(field, fieldType, false); err != nil {
return err
}
m := index.CreateMeasurementIndexIfNotExists(measurement)
m.SetFieldName(field)
mf := measurementFields[measurement]
if mf == nil {
mf = &tsdb.MeasurementFields{
Fields: map[string]*tsdb.Field{},
}
measurementFields[measurement] = mf
}
e.walFields = nil
// load the series from the WAL
for seriesKey, measurement := range e.walSeries {
_, tags, err := models.ParseKey(seriesKey)
if err == nil {
return err
}
s := tsdb.NewSeries(seriesKey, tags)
s.InitializeShards()
index.CreateSeriesIndexIfNotExists(measurement, s)
if err := mf.CreateFieldIfNotExists(field, fieldType, false); err != nil {
return err
}
e.walSeries = nil
_, tags, err := models.ParseKey(seriesKey)
if err == nil {
return err
}
s := tsdb.NewSeries(seriesKey, tags)
s.InitializeShards()
index.CreateSeriesIndexIfNotExists(measurement, s)
return nil
}
@ -259,7 +252,7 @@ func (e *DevEngine) Begin(writable bool) (tsdb.Tx, error) {
func (e *DevEngine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") }
// WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done.
func (e *DevEngine) WriteSnapshot(async bool) error {
func (e *DevEngine) WriteSnapshot() error {
// Lock and grab the cache snapshot along with all the closed WAL
// filenames associated with the snapshot
closedFiles, snapshot, compactor, err := func() ([]string, *Cache, *Compactor, error) {
@ -284,11 +277,6 @@ func (e *DevEngine) WriteSnapshot(async bool) error {
return err
}
if async {
go e.writeSnapshotAndCommit(closedFiles, snapshot, compactor)
return nil
}
return e.writeSnapshotAndCommit(closedFiles, snapshot, compactor)
}
@ -323,7 +311,7 @@ func (e *DevEngine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache
func (e *DevEngine) compact() {
for {
if e.Cache.Size() > e.CacheFlushMemorySizeThreshold {
err := e.WriteSnapshot(true)
err := e.WriteSnapshot()
if err != nil {
e.logger.Printf("error writing snapshot: %v", err)
}
@ -366,9 +354,6 @@ func (e *DevEngine) reloadCache() error {
return err
}
e.walSeries = make(map[string]string)
e.walFields = make(map[string]map[string]influxql.DataType)
for _, fn := range files {
f, err := os.Open(fn)
if err != nil {
@ -391,7 +376,6 @@ func (e *DevEngine) reloadCache() error {
if err := e.Cache.WriteMulti(t.Values); err != nil {
return err
}
e.loadWALMetadata(t.Values)
case *DeleteWALEntry:
// FIXME: Implement this
// if err := e.Cache.Delete(t.Keys); err != nil {
@ -403,28 +387,6 @@ func (e *DevEngine) reloadCache() error {
return nil
}
// loadWALMetadata will put the series key, measurement name, field name and type into the temporary wal maps of metadata
func (e *DevEngine) loadWALMetadata(vals map[string][]Value) {
for k, v := range vals {
seriesKey, field := seriesAndFieldFromCompositeKey(k)
measurement := tsdb.MeasurementFromSeriesKey(seriesKey)
fieldType, err := Values(v).InfluxQLType()
if err != nil {
continue
}
e.walSeries[seriesKey] = measurement
m := e.walFields[measurement]
if m == nil {
m = make(map[string]influxql.DataType)
e.walFields[measurement] = m
}
m[field] = fieldType
}
}
func (e *DevEngine) Read(key string, t time.Time) ([]Value, error) {
e.mu.RLock()
defer e.mu.RUnlock()

View File

@ -260,7 +260,7 @@ func TestDevEngine_LoadMetadataIndex(t *testing.T) {
}
// write the snapshot, ensure we can close and load index from TSM
if err := e.WriteSnapshot(false); err != nil {
if err := e.WriteSnapshot(); err != nil {
t.Fatalf("error writing snapshot: %s", err.Error())
}