influxdb/cmd/influx_tools/importer/importer.go

242 lines
6.0 KiB
Go

package importer
import (
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
"time"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/errlist"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/shard"
"github.com/influxdata/influxdb/cmd/influx_tools/server"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"go.uber.org/zap"
)
type importer struct {
MetaClient server.MetaClient
db string
dataDir string
replace bool
rpi *meta.RetentionPolicyInfo
log *zap.Logger
skipShard bool
currentShard uint64
sh *shard.Writer
sfile *tsdb.SeriesFile
sw *seriesWriter
buildTsi bool
seriesBuf []byte
}
const seriesBatchSize = 1000
func newImporter(server server.Interface, db string, rp string, replace bool, buildTsi bool, log *zap.Logger) *importer {
i := &importer{MetaClient: server.MetaClient(), db: db, dataDir: server.TSDBConfig().Dir, replace: replace, buildTsi: buildTsi, log: log, skipShard: false}
if !buildTsi {
i.seriesBuf = make([]byte, 0, 2048)
}
return i
}
func (i *importer) Close() error {
el := errlist.NewErrorList()
if i.sh != nil {
el.Add(i.CloseShardGroup())
}
return el.Err()
}
func (i *importer) CreateDatabase(rp *meta.RetentionPolicySpec) error {
var rpi *meta.RetentionPolicyInfo
dbInfo := i.MetaClient.Database(i.db)
if dbInfo == nil {
return i.createDatabaseWithRetentionPolicy(rp)
}
rpi, err := i.MetaClient.RetentionPolicy(i.db, rp.Name)
if err != nil {
return err
}
nonmatchingRp := (rpi != nil) && ((rp.Duration != nil && rpi.Duration != *rp.Duration) ||
(rp.ReplicaN != nil && rpi.ReplicaN != *rp.ReplicaN) ||
(rpi.ShardGroupDuration != rp.ShardGroupDuration))
if nonmatchingRp {
return fmt.Errorf("retention policy %v already exists with different parameters", rp.Name)
} else {
if _, err := i.MetaClient.CreateRetentionPolicy(i.db, rp, false); err != nil {
return err
}
}
i.rpi, err = i.MetaClient.RetentionPolicy(i.db, rp.Name)
return err
}
func (i *importer) createDatabaseWithRetentionPolicy(rp *meta.RetentionPolicySpec) error {
var err error
var dbInfo *meta.DatabaseInfo
if len(rp.Name) == 0 {
dbInfo, err = i.MetaClient.CreateDatabase(i.db)
} else {
dbInfo, err = i.MetaClient.CreateDatabaseWithRetentionPolicy(i.db, rp)
}
if err != nil {
return err
}
i.rpi = dbInfo.RetentionPolicy(rp.Name)
return nil
}
func (i *importer) StartShardGroup(start int64, end int64) error {
startTime := time.Unix(0, start)
endTime := time.Unix(0, end)
// Search for non-inclusive end time as shards with end_shard1 == start_shard2 are should not be considered overlapping
existingSg, err := i.MetaClient.NodeShardGroupsByTimeRange(i.db, i.rpi.Name, startTime, endTime.Add(-1))
if err != nil {
return err
}
i.log.Info(fmt.Sprintf("Starting shard group %v-%v with %d existing shard groups", startTime, endTime, len(existingSg)))
var sgi *meta.ShardGroupInfo
var shardID uint64
shardsPath := i.shardPath(i.rpi.Name)
var shardPath string
if len(existingSg) > 0 {
sgi = &existingSg[0]
if len(sgi.Shards) > 1 {
return fmt.Errorf("multiple shards for the same owner %v and time range %v to %v", sgi.Shards[0].Owners, startTime, endTime)
}
if !sgi.StartTime.Equal(startTime) || !sgi.EndTime.Equal(endTime) {
return fmt.Errorf("Shard group time not matching, %v,%v != %v,%v", sgi.StartTime, sgi.EndTime, startTime, endTime)
}
shardID = sgi.Shards[0].ID
shardPath = filepath.Join(shardsPath, strconv.Itoa(int(shardID)))
_, err = os.Stat(shardPath)
if err != nil {
if !os.IsNotExist(err) {
return err
}
} else {
if i.replace {
if err := os.RemoveAll(shardPath); err != nil {
return err
}
} else {
if i.log != nil {
i.log.Error(fmt.Sprintf("shard %d already exists, skipping over new shard data", sgi.ID))
}
i.skipShard = true
return nil
}
}
} else {
sgi, err = i.MetaClient.CreateShardGroup(i.db, i.rpi.Name, startTime)
if err != nil {
return err
}
shardID = sgi.Shards[0].ID
}
shardPath = filepath.Join(shardsPath, strconv.Itoa(int(shardID)))
if err = os.MkdirAll(shardPath, 0777); err != nil {
return err
}
i.skipShard = false
i.sh = shard.NewWriter(shardID, shardsPath)
i.currentShard = shardID
i.startSeriesFile()
return nil
}
func (i *importer) shardPath(rp string) string {
return filepath.Join(i.dataDir, i.db, rp)
}
func (i *importer) removeShardGroup(rp string, shardID uint64) error {
shardPath := i.shardPath(rp)
err := os.RemoveAll(filepath.Join(shardPath, strconv.Itoa(int(shardID))))
return err
}
func (i *importer) Write(key []byte, values tsm1.Values) error {
if i.skipShard {
return nil
}
if i.sh == nil {
return errors.New("importer not currently writing a shard")
}
i.sh.Write(key, values)
if i.sh.Err() != nil {
el := errlist.NewErrorList()
el.Add(i.sh.Err())
el.Add(i.CloseShardGroup())
el.Add(i.removeShardGroup(i.rpi.Name, i.currentShard))
i.sh = nil
i.currentShard = 0
return el.Err()
}
return nil
}
func (i *importer) CloseShardGroup() error {
if i.skipShard {
i.skipShard = false
return nil
}
el := errlist.NewErrorList()
el.Add(i.closeSeriesFile())
i.sh.Close()
if i.sh.Err() != nil {
el.Add(i.sh.Err())
}
i.sh = nil
return el.Err()
}
func (i *importer) startSeriesFile() error {
dataPath := filepath.Join(i.dataDir, i.db)
shardPath := filepath.Join(i.dataDir, i.db, i.rpi.Name)
i.sfile = tsdb.NewSeriesFile(filepath.Join(dataPath, tsdb.SeriesFileDirectory))
if err := i.sfile.Open(); err != nil {
return err
}
var err error
if i.buildTsi {
i.sw, err = newTSI1SeriesWriter(i.sfile, i.db, dataPath, shardPath, int(i.sh.ShardID()))
} else {
i.sw, err = newInMemSeriesWriter(i.sfile, i.db, dataPath, shardPath, int(i.sh.ShardID()), i.seriesBuf)
}
if err != nil {
return err
}
return nil
}
func (i *importer) AddSeries(seriesKey []byte) error {
if i.skipShard {
return nil
}
return i.sw.AddSeries(seriesKey)
}
func (i *importer) closeSeriesFile() error {
return i.sw.Close()
}