Improve startup time of `inmem` index

This commit improves the startup time when using the `inmem` index by
ensuring that the series are created in the index and series file in
batches of 10000, rather than individually.

Fixes #9486.
pull/9488/head
Edd Robinson 2018-02-27 13:01:42 +00:00
parent 632e01e068
commit 96c0ecf618
4 changed files with 85 additions and 25 deletions

View File

@ -725,43 +725,76 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
return nil
}
keys := make([][]byte, 0, 10000)
fieldTypes := make([]influxql.DataType, 0, 10000)
if err := e.FileStore.WalkKeys(nil, func(key []byte, typ byte) error {
fieldType := BlockTypeToInfluxQLDataType(typ)
if fieldType == influxql.Unknown {
return fmt.Errorf("unknown block type: %v", typ)
}
if err := e.addToIndexFromKey(key, fieldType); err != nil {
return err
keys = append(keys, key)
fieldTypes = append(fieldTypes, fieldType)
if len(keys) == cap(keys) {
// Send batch of keys to the index.
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
return err
}
// Reset buffers.
keys, fieldTypes = keys[:0], fieldTypes[:0]
}
return nil
}); err != nil {
return err
}
if len(keys) > 0 {
// Add remaining partial batch from FileStore.
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
return err
}
keys, fieldTypes = keys[:0], fieldTypes[:0]
}
// load metadata from the Cache
if err := e.Cache.ApplyEntryFn(func(key []byte, entry *entry) error {
fieldType, err := entry.values.InfluxQLType()
if err != nil {
e.logger.Info("Error getting the data type of values for key",
zap.ByteString("key", key), zap.Error(err))
e.logger.Info("Error getting the data type of values for key", zap.ByteString("key", key), zap.Error(err))
}
if err := e.addToIndexFromKey(key, fieldType); err != nil {
return err
keys = append(keys, key)
fieldTypes = append(fieldTypes, fieldType)
if len(keys) == cap(keys) {
// Send batch of keys to the index.
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
return err
}
// Reset buffers.
keys, fieldTypes = keys[:0], fieldTypes[:0]
}
return nil
}); err != nil {
return err
}
if len(keys) > 0 {
// Add remaining partial batch from FileStore.
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
return err
}
}
// Save the field set index so we don't have to rebuild it next time
if err := e.fieldset.Save(); err != nil {
return err
}
e.traceLogger.Info("Meta data index for shard loaded",
zap.Uint64("id", shardID), zap.Duration("duration", time.Since(now)))
e.traceLogger.Info("Meta data index for shard loaded", zap.Uint64("id", shardID), zap.Duration("duration", time.Since(now)))
return nil
}
@ -1013,6 +1046,8 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error {
// Merge and dedup all the series keys across each reader to reduce
// lock contention on the index.
keys := make([][]byte, 0, 10000)
fieldTypes := make([]influxql.DataType, 0, 10000)
merged := merge(readers...)
for v := range merged {
fieldType := BlockTypeToInfluxQLDataType(v.typ)
@ -1020,7 +1055,23 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error {
return fmt.Errorf("unknown block type: %v", v.typ)
}
if err := e.addToIndexFromKey(v.key, fieldType); err != nil {
keys = append(keys, v.key)
fieldTypes = append(fieldTypes, fieldType)
if len(keys) == cap(keys) {
// Send batch of keys to the index.
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
return err
}
// Reset buffers.
keys, fieldTypes = keys[:0], fieldTypes[:0]
}
}
if len(keys) > 0 {
// Add remaining partial batch.
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
return err
}
}
@ -1086,25 +1137,34 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string, as
return tmp, nil
}
// addToIndexFromKey will pull the measurement name, series key, and field name from a composite key and add it to the
// database index and measurement fields
func (e *Engine) addToIndexFromKey(key []byte, fieldType influxql.DataType) error {
seriesKey, field := SeriesAndFieldFromCompositeKey(key)
name := tsdb.MeasurementFromSeriesKey(seriesKey)
// addToIndexFromKey will pull the measurement names, series keys, and field
// names from composite keys, and add them to the database index and measurement
// fields.
func (e *Engine) addToIndexFromKey(keys [][]byte, fieldTypes []influxql.DataType) error {
var field []byte
names := make([][]byte, 0, len(keys))
tags := make([]models.Tags, 0, len(keys))
mf := e.fieldset.CreateFieldsIfNotExists(name)
if err := mf.CreateFieldIfNotExists(field, fieldType); err != nil {
return err
for i := 0; i < len(keys); i++ {
// Replace tsm key format with index key format.
keys[i], field = SeriesAndFieldFromCompositeKey(keys[i])
name := tsdb.MeasurementFromSeriesKey(keys[i])
mf := e.fieldset.CreateFieldsIfNotExists(name)
if err := mf.CreateFieldIfNotExists(field, fieldTypes[i]); err != nil {
return err
}
names = append(names, name)
tags = append(tags, models.ParseTags(keys[i]))
}
tags := models.ParseTags(seriesKey)
// Build in-memory index, if necessary.
if e.index.Type() == inmem.IndexName {
if err := e.index.InitializeSeries(seriesKey, name, tags); err != nil {
if err := e.index.InitializeSeries(keys, names, tags); err != nil {
return err
}
} else {
if err := e.index.CreateSeriesIfNotExists(seriesKey, name, tags); err != nil {
if err := e.index.CreateSeriesListIfNotExists(keys, names, tags); err != nil {
return err
}
}

View File

@ -28,7 +28,7 @@ type Index interface {
DropMeasurement(name []byte) error
ForEachMeasurementName(fn func(name []byte) error) error
InitializeSeries(key, name []byte, tags models.Tags) error
InitializeSeries(keys, names [][]byte, tags []models.Tags) error
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DropSeries(seriesID uint64, key []byte, cascade bool) error

View File

@ -1155,9 +1155,9 @@ func (idx *ShardIndex) SeriesN() int64 {
}
// InitializeSeries is called during start-up.
// This works the same as CreateSeriesIfNotExists except it ignore limit errors.
func (idx *ShardIndex) InitializeSeries(key, name []byte, tags models.Tags) error {
return idx.Index.CreateSeriesListIfNotExists(idx.id, idx.seriesIDSet, [][]byte{key}, [][]byte{name}, []models.Tags{tags}, &idx.opt, true)
// This works the same as CreateSeriesListIfNotExists except it ignore limit errors.
func (idx *ShardIndex) InitializeSeries(keys, names [][]byte, tags []models.Tags) error {
return idx.Index.CreateSeriesListIfNotExists(idx.id, idx.seriesIDSet, keys, names, tags, &idx.opt, true)
}
// CreateSeriesIfNotExists creates the provided series on the index if it is not

View File

@ -554,7 +554,7 @@ func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) erro
}
// InitializeSeries is a no-op. This only applies to the in-memory index.
func (i *Index) InitializeSeries(key, name []byte, tags models.Tags) error {
func (i *Index) InitializeSeries(keys, names [][]byte, tags []models.Tags) error {
return nil
}