influxdb/cmd/influx_tools/generate/exec/generator.go

189 lines
4.2 KiB
Go

package exec
import (
"context"
"fmt"
"path"
"path/filepath"
"strconv"
"sync"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/errlist"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/shard"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/data/gen"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"github.com/influxdata/influxdb/tsdb/index/tsi1"
)
type Generator struct {
Concurrency int
BuildTSI bool
sfile *tsdb.SeriesFile
}
func (g *Generator) Run(ctx context.Context, database, shardPath string, groups []meta.ShardGroupInfo, gens []gen.SeriesGenerator) (err error) {
limit := make(chan struct{}, g.Concurrency)
for i := 0; i < g.Concurrency; i++ {
limit <- struct{}{}
}
var (
wg sync.WaitGroup
errs errlist.ErrorList
ch = make(chan error, len(groups))
)
dbPath := path.Dir(shardPath)
g.sfile = tsdb.NewSeriesFile(filepath.Join(dbPath, tsdb.SeriesFileDirectory))
if err := g.sfile.Open(); err != nil {
return err
}
defer g.sfile.Close()
g.sfile.DisableCompactions()
wg.Add(len(groups))
for i := 0; i < len(groups); i++ {
go func(n int) {
<-limit
defer func() {
wg.Done()
limit <- struct{}{}
}()
sgi := &groups[n]
if len(sgi.Shards) > 1 {
ch <- fmt.Errorf("multiple shards for the same owner %v", sgi.Shards[0].Owners)
return
}
id := sgi.Shards[0].ID
var (
idx seriesIndex
ti *tsi1.Index
)
if g.BuildTSI {
ti = tsi1.NewIndex(g.sfile, database, tsi1.WithPath(filepath.Join(shardPath, strconv.Itoa(int(id)), "index")))
if err := ti.Open(); err != nil {
ch <- fmt.Errorf("error opening TSI1 index %d: %s", id, err.Error())
return
}
idx = ti
} else {
idx = &seriesFileAdapter{sf: g.sfile, buf: make([]byte, 0, 2048)}
}
if err := g.writeShard(idx, gens[n], id, shardPath); err != nil {
ch <- fmt.Errorf("error writing shard %d: %s", id, err.Error())
}
if ti != nil {
ti.Compact()
ti.Wait()
if err := ti.Close(); err != nil {
ch <- fmt.Errorf("error compacting TSI1 index %d: %s", id, err.Error())
}
}
}(i)
}
wg.Wait()
close(ch)
for e := range ch {
errs.Add(e)
}
parts := g.sfile.Partitions()
wg.Add(len(parts))
ch = make(chan error, len(parts))
for i := range parts {
go func(n int) {
<-limit
defer func() {
wg.Done()
limit <- struct{}{}
}()
p := parts[n]
c := tsdb.NewSeriesPartitionCompactor()
if err := c.Compact(p); err != nil {
ch <- fmt.Errorf("error compacting series partition %d: %s", n, err.Error())
}
}(i)
}
wg.Wait()
close(ch)
for e := range ch {
errs.Add(e)
}
return errs.Err()
}
// seriesBatchSize specifies the number of series keys passed to the index.
const seriesBatchSize = 1000
func (g *Generator) writeShard(idx seriesIndex, sg gen.SeriesGenerator, id uint64, path string) error {
sw := shard.NewWriter(id, path)
defer sw.Close()
var (
keys [][]byte
names [][]byte
tags []models.Tags
)
for sg.Next() {
seriesKey := sg.Key()
keys = append(keys, seriesKey)
names = append(names, sg.Name())
tags = append(tags, sg.Tags())
if len(keys) == seriesBatchSize {
if err := idx.CreateSeriesListIfNotExists(keys, names, tags, tsdb.NoopStatsTracker()); err != nil {
return err
}
keys = keys[:0]
names = names[:0]
tags = tags[:0]
}
vg := sg.TimeValuesGenerator()
key := tsm1.SeriesFieldKeyBytes(string(seriesKey), string(sg.Field()))
for vg.Next() {
sw.WriteV(key, vg.Values())
}
if err := sw.Err(); err != nil {
return err
}
}
if len(keys) > seriesBatchSize {
if err := idx.CreateSeriesListIfNotExists(keys, names, tags, tsdb.NoopStatsTracker()); err != nil {
return err
}
}
return nil
}
type seriesIndex interface {
CreateSeriesListIfNotExists(keys [][]byte, names [][]byte, tagsSlice []models.Tags, tracker tsdb.StatsTracker) error
}
type seriesFileAdapter struct {
sf *tsdb.SeriesFile
buf []byte
}
func (s *seriesFileAdapter) CreateSeriesListIfNotExists(keys [][]byte, names [][]byte, tagsSlice []models.Tags, tracker tsdb.StatsTracker) (err error) {
_, err = s.sf.CreateSeriesListIfNotExists(names, tagsSlice, tracker)
return
}