242 lines
6.0 KiB
Go
242 lines
6.0 KiB
Go
package importer
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/influxdata/influxdb/cmd/influx_tools/internal/errlist"
|
|
"github.com/influxdata/influxdb/cmd/influx_tools/internal/shard"
|
|
"github.com/influxdata/influxdb/cmd/influx_tools/server"
|
|
"github.com/influxdata/influxdb/services/meta"
|
|
"github.com/influxdata/influxdb/tsdb"
|
|
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type importer struct {
|
|
MetaClient server.MetaClient
|
|
db string
|
|
dataDir string
|
|
replace bool
|
|
|
|
rpi *meta.RetentionPolicyInfo
|
|
log *zap.Logger
|
|
skipShard bool
|
|
currentShard uint64
|
|
sh *shard.Writer
|
|
sfile *tsdb.SeriesFile
|
|
sw *seriesWriter
|
|
buildTsi bool
|
|
seriesBuf []byte
|
|
}
|
|
|
|
const seriesBatchSize = 1000
|
|
|
|
func newImporter(server server.Interface, db string, rp string, replace bool, buildTsi bool, log *zap.Logger) *importer {
|
|
i := &importer{MetaClient: server.MetaClient(), db: db, dataDir: server.TSDBConfig().Dir, replace: replace, buildTsi: buildTsi, log: log, skipShard: false}
|
|
|
|
if !buildTsi {
|
|
i.seriesBuf = make([]byte, 0, 2048)
|
|
}
|
|
return i
|
|
}
|
|
|
|
func (i *importer) Close() error {
|
|
el := errlist.NewErrorList()
|
|
if i.sh != nil {
|
|
el.Add(i.CloseShardGroup())
|
|
}
|
|
return el.Err()
|
|
}
|
|
|
|
func (i *importer) CreateDatabase(rp *meta.RetentionPolicySpec) error {
|
|
var rpi *meta.RetentionPolicyInfo
|
|
dbInfo := i.MetaClient.Database(i.db)
|
|
if dbInfo == nil {
|
|
return i.createDatabaseWithRetentionPolicy(rp)
|
|
}
|
|
|
|
rpi, err := i.MetaClient.RetentionPolicy(i.db, rp.Name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
nonmatchingRp := (rpi != nil) && ((rp.Duration != nil && rpi.Duration != *rp.Duration) ||
|
|
(rp.ReplicaN != nil && rpi.ReplicaN != *rp.ReplicaN) ||
|
|
(rpi.ShardGroupDuration != rp.ShardGroupDuration))
|
|
if nonmatchingRp {
|
|
return fmt.Errorf("retention policy %v already exists with different parameters", rp.Name)
|
|
} else {
|
|
if _, err := i.MetaClient.CreateRetentionPolicy(i.db, rp, false); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
i.rpi, err = i.MetaClient.RetentionPolicy(i.db, rp.Name)
|
|
return err
|
|
}
|
|
|
|
func (i *importer) createDatabaseWithRetentionPolicy(rp *meta.RetentionPolicySpec) error {
|
|
var err error
|
|
var dbInfo *meta.DatabaseInfo
|
|
if len(rp.Name) == 0 {
|
|
dbInfo, err = i.MetaClient.CreateDatabase(i.db)
|
|
} else {
|
|
dbInfo, err = i.MetaClient.CreateDatabaseWithRetentionPolicy(i.db, rp)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
i.rpi = dbInfo.RetentionPolicy(rp.Name)
|
|
return nil
|
|
}
|
|
|
|
func (i *importer) StartShardGroup(start int64, end int64) error {
|
|
startTime := time.Unix(0, start)
|
|
endTime := time.Unix(0, end)
|
|
// Search for non-inclusive end time as shards with end_shard1 == start_shard2 are should not be considered overlapping
|
|
existingSg, err := i.MetaClient.NodeShardGroupsByTimeRange(i.db, i.rpi.Name, startTime, endTime.Add(-1))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
i.log.Info(fmt.Sprintf("Starting shard group %v-%v with %d existing shard groups", startTime, endTime, len(existingSg)))
|
|
|
|
var sgi *meta.ShardGroupInfo
|
|
var shardID uint64
|
|
|
|
shardsPath := i.shardPath(i.rpi.Name)
|
|
var shardPath string
|
|
if len(existingSg) > 0 {
|
|
sgi = &existingSg[0]
|
|
if len(sgi.Shards) > 1 {
|
|
return fmt.Errorf("multiple shards for the same owner %v and time range %v to %v", sgi.Shards[0].Owners, startTime, endTime)
|
|
}
|
|
if !sgi.StartTime.Equal(startTime) || !sgi.EndTime.Equal(endTime) {
|
|
return fmt.Errorf("Shard group time not matching, %v,%v != %v,%v", sgi.StartTime, sgi.EndTime, startTime, endTime)
|
|
}
|
|
|
|
shardID = sgi.Shards[0].ID
|
|
|
|
shardPath = filepath.Join(shardsPath, strconv.Itoa(int(shardID)))
|
|
_, err = os.Stat(shardPath)
|
|
if err != nil {
|
|
if !os.IsNotExist(err) {
|
|
return err
|
|
}
|
|
} else {
|
|
if i.replace {
|
|
if err := os.RemoveAll(shardPath); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if i.log != nil {
|
|
i.log.Error(fmt.Sprintf("shard %d already exists, skipping over new shard data", sgi.ID))
|
|
}
|
|
i.skipShard = true
|
|
return nil
|
|
}
|
|
}
|
|
} else {
|
|
sgi, err = i.MetaClient.CreateShardGroup(i.db, i.rpi.Name, startTime)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
shardID = sgi.Shards[0].ID
|
|
}
|
|
|
|
shardPath = filepath.Join(shardsPath, strconv.Itoa(int(shardID)))
|
|
if err = os.MkdirAll(shardPath, 0777); err != nil {
|
|
return err
|
|
}
|
|
|
|
i.skipShard = false
|
|
i.sh = shard.NewWriter(shardID, shardsPath)
|
|
i.currentShard = shardID
|
|
|
|
i.startSeriesFile()
|
|
return nil
|
|
}
|
|
|
|
func (i *importer) shardPath(rp string) string {
|
|
return filepath.Join(i.dataDir, i.db, rp)
|
|
}
|
|
|
|
func (i *importer) removeShardGroup(rp string, shardID uint64) error {
|
|
shardPath := i.shardPath(rp)
|
|
err := os.RemoveAll(filepath.Join(shardPath, strconv.Itoa(int(shardID))))
|
|
return err
|
|
}
|
|
|
|
func (i *importer) Write(key []byte, values tsm1.Values) error {
|
|
if i.skipShard {
|
|
return nil
|
|
}
|
|
if i.sh == nil {
|
|
return errors.New("importer not currently writing a shard")
|
|
}
|
|
i.sh.Write(key, values)
|
|
if i.sh.Err() != nil {
|
|
el := errlist.NewErrorList()
|
|
el.Add(i.sh.Err())
|
|
el.Add(i.CloseShardGroup())
|
|
el.Add(i.removeShardGroup(i.rpi.Name, i.currentShard))
|
|
i.sh = nil
|
|
i.currentShard = 0
|
|
return el.Err()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (i *importer) CloseShardGroup() error {
|
|
if i.skipShard {
|
|
i.skipShard = false
|
|
return nil
|
|
}
|
|
el := errlist.NewErrorList()
|
|
el.Add(i.closeSeriesFile())
|
|
i.sh.Close()
|
|
if i.sh.Err() != nil {
|
|
el.Add(i.sh.Err())
|
|
}
|
|
i.sh = nil
|
|
return el.Err()
|
|
}
|
|
|
|
func (i *importer) startSeriesFile() error {
|
|
dataPath := filepath.Join(i.dataDir, i.db)
|
|
shardPath := filepath.Join(i.dataDir, i.db, i.rpi.Name)
|
|
|
|
i.sfile = tsdb.NewSeriesFile(filepath.Join(dataPath, tsdb.SeriesFileDirectory))
|
|
if err := i.sfile.Open(); err != nil {
|
|
return err
|
|
}
|
|
|
|
var err error
|
|
if i.buildTsi {
|
|
i.sw, err = newTSI1SeriesWriter(i.sfile, i.db, dataPath, shardPath, int(i.sh.ShardID()))
|
|
} else {
|
|
i.sw, err = newInMemSeriesWriter(i.sfile, i.db, dataPath, shardPath, int(i.sh.ShardID()), i.seriesBuf)
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (i *importer) AddSeries(seriesKey []byte) error {
|
|
if i.skipShard {
|
|
return nil
|
|
}
|
|
return i.sw.AddSeries(seriesKey)
|
|
}
|
|
|
|
func (i *importer) closeSeriesFile() error {
|
|
return i.sw.Close()
|
|
}
|