influxdb/cmd/influxd/generate/generator.go

162 lines
3.6 KiB
Go

package generate
import (
"context"
"fmt"
"os"
"path/filepath"
"runtime"
"sync"
"github.com/influxdata/influxdb/cmd/influxd/generate/internal/shard"
"github.com/influxdata/influxdb/kit/errors"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/data/gen"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/storage"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/tsi1"
"github.com/influxdata/influxdb/tsdb/tsm1"
)
type Generator struct {
sfile *tsdb.SeriesFile
// Clean specifies whether to clean any of the data related files
Clean CleanLevel
}
func (g *Generator) Run(ctx context.Context, path string, gen gen.SeriesGenerator) ([]string, error) {
path = filepath.Join(path, "engine")
config := storage.NewConfig()
switch g.Clean {
case CleanLevelTSM:
if err := os.RemoveAll(path); err != nil {
return nil, err
}
case CleanLevelAll:
if err := os.RemoveAll(path); err != nil {
return nil, err
}
}
g.sfile = tsdb.NewSeriesFile(config.GetSeriesFilePath(path))
if err := g.sfile.Open(ctx); err != nil {
return nil, err
}
defer g.sfile.Close()
g.sfile.DisableCompactions()
ti := tsi1.NewIndex(g.sfile, config.Index, tsi1.WithPath(config.GetIndexPath(path)))
if err := ti.Open(ctx); err != nil {
return nil, fmt.Errorf("error opening TSI1 index: %s", err.Error())
}
files, err := g.writeShard(ti, gen, config.GetEnginePath(path))
if err != nil {
return nil, fmt.Errorf("error writing data: %s", err.Error())
}
ti.Compact()
ti.Wait()
if err := ti.Close(); err != nil {
return nil, fmt.Errorf("error compacting TSI1 index: %s", err.Error())
}
var (
wg sync.WaitGroup
errs errors.List
)
parts := g.sfile.Partitions()
wg.Add(len(parts))
ch := make(chan error, len(parts))
limit := limiter.NewFixed(runtime.NumCPU())
for i := range parts {
go func(n int) {
limit.Take()
defer func() {
wg.Done()
limit.Release()
}()
p := parts[n]
c := tsdb.NewSeriesPartitionCompactor()
if _, err := c.Compact(p); err != nil {
ch <- fmt.Errorf("error compacting series partition %d: %s", n, err.Error())
}
}(i)
}
wg.Wait()
close(ch)
for e := range ch {
errs.Append(e)
}
if err := errs.Err(); err != nil {
return nil, err
}
return files, nil
}
// seriesBatchSize specifies the number of series keys passed to the index.
const seriesBatchSize = 1000
func (g *Generator) writeShard(idx *tsi1.Index, sg gen.SeriesGenerator, path string) ([]string, error) {
if err := os.MkdirAll(path, 0777); err != nil {
return nil, err
}
sw, err := shard.NewWriter(path, shard.AutoNumber())
if err != nil {
return nil, err
}
defer sw.Close()
coll := &tsdb.SeriesCollection{
Keys: make([][]byte, 0, seriesBatchSize),
Names: make([][]byte, 0, seriesBatchSize),
Tags: make([]models.Tags, 0, seriesBatchSize),
Types: make([]models.FieldType, 0, seriesBatchSize),
}
for sg.Next() {
seriesKey := sg.Key()
coll.Keys = append(coll.Keys, seriesKey)
coll.Names = append(coll.Names, sg.ID())
coll.Tags = append(coll.Tags, sg.Tags())
coll.Types = append(coll.Types, sg.FieldType())
if coll.Length() == seriesBatchSize {
if err := idx.CreateSeriesListIfNotExists(coll); err != nil {
return nil, err
}
coll.Truncate(0)
}
vg := sg.TimeValuesGenerator()
key := tsm1.SeriesFieldKeyBytes(string(seriesKey), string(sg.Field()))
for vg.Next() {
sw.WriteV(key, vg.Values())
}
if err := sw.Err(); err != nil {
return nil, err
}
}
if coll.Length() > 0 {
if err := idx.CreateSeriesListIfNotExists(coll); err != nil {
return nil, err
}
}
return sw.Files(), nil
}