Merge pull request #17016 from influxdata/er-bulk-import

feat(storage): prototype 1.x–2.x migration tooling
pull/17336/head
Edd Robinson 2020-03-18 17:57:26 +00:00 committed by GitHub
commit d96cbd4f74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1065 additions and 0 deletions

View File

@ -12,6 +12,7 @@ import (
"github.com/influxdata/influxdb/cmd/influxd/generate"
"github.com/influxdata/influxdb/cmd/influxd/inspect"
"github.com/influxdata/influxdb/cmd/influxd/launcher"
"github.com/influxdata/influxdb/cmd/influxd/migrate"
"github.com/influxdata/influxdb/cmd/influxd/restore"
_ "github.com/influxdata/influxdb/query/builtin"
_ "github.com/influxdata/influxdb/tsdb/tsi1"
@ -48,6 +49,7 @@ func init() {
rootCmd.AddCommand(generate.Command)
rootCmd.AddCommand(inspect.NewCommand())
rootCmd.AddCommand(restore.Command)
rootCmd.AddCommand(migrate.Command)
// TODO: this should be removed in the future: https://github.com/influxdata/influxdb/issues/16220
if os.Getenv("QUERY_TRACING") == "1" {

View File

@ -0,0 +1,212 @@
// Package migrate provides a tool to help migrate data from InfluxDB 1.x to 2.x
package migrate
import (
"errors"
"fmt"
"os"
"os/user"
"path/filepath"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/internal/fs"
"github.com/influxdata/influxdb/kit/cli"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/migrate"
"github.com/spf13/cobra"
)
var Command = &cobra.Command{
Use: "migrate",
Short: "Migrate from InfluxDB >= 1.7.x to InfluxDB 2.x",
Long: `NOTE!
This tool is in a very early prototype stage. It can corrupt your 2.x TSM data.
Please see the following issues for progress towards making this tool production
ready:
- Ensure retention policy duration carried over to 2.x bucket: https://github.com/influxdata/influxdb/issues/17257
- Support migrating arbitrary time-ranges: https://github.com/influxdata/influxdb/issues/17249
- Ensure hot shards not migrated by default: https://github.com/influxdata/influxdb/issues/17250
- Handle cases where multiple 1.x shards have different field types for same series: https://github.com/influxdata/influxdb/issues/17251
This tool allows an operator to migrate their TSM data from an OSS 1.x server
into an OSS 2.x server in an offline manner.
It is very important when running this tool that the 2.x server is not running,
and ideally the 1.x server is not running, or at least, it is not writing into
any of the shards that will be migrated.
`,
Args: cobra.ExactArgs(0),
RunE: migrateE,
}
var flags struct {
basePath1x string // base path of 1.x installation
db string
rp string
from string // migrate only data at least as old as this (RFC3339Nano format)
to string // migrate only data at least as young as this (RFC3339Nano format)
migrateHot bool // migrate hot shards (can leave data behind)
basePath2x string // base path of 2.x installation (defaults to ~/.influxdbv2)
destOrg string // destination 2.x organisation (base-16 format)
dryRun bool // enable dry-run mode (don't do any migration)
verbose bool // enable verbose logging
}
// influx1Dir retrieves the InfluxDB 1.x directory.
func influx1Dir() (string, error) {
var dir string
// By default, store meta and data files in current users home directory
u, err := user.Current()
if err == nil {
dir = u.HomeDir
} else if home := os.Getenv("HOME"); home != "" {
dir = home
} else {
wd, err := os.Getwd()
if err != nil {
return "", err
}
dir = wd
}
dir = filepath.Join(dir, ".influxdb")
return dir, nil
}
func init() {
v1Dir, err := influx1Dir()
if err != nil {
panic(fmt.Errorf("failed to determine default InfluxDB 1.x directory: %s", err))
}
v2Dir, err := fs.InfluxDir()
if err != nil {
panic(fmt.Errorf("failed to determine default InfluxDB 2.x directory: %s", err))
}
opts := []cli.Opt{
{
DestP: &flags.basePath1x,
Flag: "influxdb-1x-path",
Default: v1Dir,
Desc: "path to 1.x InfluxDB",
},
{
DestP: &flags.basePath2x,
Flag: "influxdb-2x-path",
Default: v2Dir,
Desc: "path to 2.x InfluxDB",
},
{
DestP: &flags.db,
Flag: "db",
Default: "",
Desc: "only import the provided 1.x database",
},
{
DestP: &flags.rp,
Flag: "rp",
Default: "",
Desc: "only import the provided 1.x retention policy. --db must be set",
},
{
DestP: &flags.from,
Flag: "from",
Default: "",
Desc: "earliest point to import",
},
{
DestP: &flags.to,
Flag: "to",
Default: "",
Desc: "latest point to import",
},
{
DestP: &flags.destOrg,
Flag: "org-id",
Default: "",
Desc: "destination 2.x organization id (required)",
},
{
DestP: &flags.dryRun,
Flag: "dry-run",
Default: false,
Desc: "simulate migration without running it",
},
{
DestP: &flags.migrateHot,
Flag: "migrate-hot-shards",
Default: false,
Desc: "migrate all shards including hot ones. Can leave unsnapshotted data behind",
},
{
DestP: &flags.verbose,
Flag: "verbose",
Default: false,
Desc: "enable verbose logging",
},
}
cli.BindOptions(Command, opts)
}
func migrateE(cmd *cobra.Command, args []string) error {
if flags.destOrg == "" {
return errors.New("destination organization must be set")
} else if flags.rp != "" && flags.db == "" {
return errors.New("source database empty. Cannot filter by retention policy")
}
destOrg, err := influxdb.IDFromString(flags.destOrg)
if err != nil {
return err
}
fromNano := models.MinNanoTime
if flags.from != "" {
from, err := time.Parse(time.RFC3339Nano, flags.from)
if err != nil {
return err
}
fromNano = from.UnixNano()
// TODO: enable this feature...
_ = fromNano
return errors.New("--from flag is currently unsupported; only all-time can be migrated")
}
toNano := models.MaxNanoTime
if flags.to != "" {
to, err := time.Parse(time.RFC3339Nano, flags.to)
if err != nil {
return err
}
toNano = to.UnixNano()
// TODO: enable this feature...
_ = toNano
return errors.New("--to flag is currently unsupported; only all-time can be migrated")
}
// TODO: enable this feature...
if flags.migrateHot {
return errors.New("--migrate-hot-shards flag is currently unsupported; fully compact all shards before migrating")
}
migrator := migrate.NewMigrator(migrate.Config{
SourcePath: flags.basePath1x,
DestPath: flags.basePath2x,
From: fromNano,
To: toNano,
MigrateHotShard: flags.migrateHot,
Stdout: os.Stdout,
VerboseLogging: flags.verbose,
DestOrg: *destOrg,
DryRun: flags.dryRun,
})
return migrator.Process1xShards(flags.db, flags.rp)
}

54
pkg/bufio/bufio.go Normal file
View File

@ -0,0 +1,54 @@
package bufio
import (
"bufio"
"errors"
"io"
)
// Writer implements buffering for an io.WriteCloser object.
//
// Writer provides access to a buffered writer that can be closed. Closing the
// writer will result in all remaining data being flushed from the buffer before
// the underlying WriteCloser is closed.
type Writer struct {
c io.Closer
bw *bufio.Writer
}
// NewWriter initialises a new Writer with the default buffer size.
func NewWriter(w io.WriteCloser) *Writer {
return &Writer{
c: w,
bw: bufio.NewWriter(w),
}
}
// NewWriterSize initialises a new Writer with the provided buffer size.
func NewWriterSize(w io.WriteCloser, sz int) *Writer {
return &Writer{
c: w,
bw: bufio.NewWriterSize(w, sz),
}
}
// Write writes the contents of p into the buffer. It returns the number of
// bytes written. If fewer than len(p) bytes are written, and error is returned
// explaining why.
func (w *Writer) Write(p []byte) (int, error) {
return w.bw.Write(p)
}
// Close closes the Writer, flushing any remaining data from the buffer.
func (w *Writer) Close() error {
if w.bw == nil || w.c == nil {
return errors.New("cannot close nil Writer")
}
if err := w.bw.Flush(); err != nil {
w.c.Close()
return err
}
return w.c.Close()
}

687
tsdb/migrate/migrate.go Normal file
View File

@ -0,0 +1,687 @@
// Package migrate provides tooling to migrate data from InfluxDB 1.x to 2.x
package migrate
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/bolt"
"github.com/influxdata/influxdb/cmd/influx_inspect/buildtsi"
"github.com/influxdata/influxdb/kv"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/bufio"
"github.com/influxdata/influxdb/pkg/fs"
"github.com/influxdata/influxdb/storage"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/seriesfile"
"github.com/influxdata/influxdb/tsdb/tsi1"
"github.com/influxdata/influxdb/tsdb/tsm1"
"go.uber.org/zap"
)
const (
dataDirName1x = "data"
internalDBName1x = "_internal"
importTempExtension = ".migrate"
// // InfluxDB 1.x TSM index entry size.
tsmIndexEntrySize1x = 0 +
8 + // Block min time
8 + // Block max time
8 + // Offset of block
4 // Size in bytes of block
tsmKeyFieldSeparator1x = "#!~#" // tsm1 key field separator.
)
type Config struct {
SourcePath string
DestPath string
DestOrg influxdb.ID
From int64
To int64
MigrateHotShard bool
DryRun bool
// Optional if you want to emit logs
Stdout io.Writer
VerboseLogging bool
}
// A Migrator migrates TSM data from a InfluxDB 1.x to InfluxDB 2.x.
type Migrator struct {
Config
store *bolt.KVStore // ref needed to we can cleanup
metaSvc *kv.Service
verboseStdout io.Writer
current2xTSMGen int
}
func NewMigrator(c Config) *Migrator {
if c.Stdout == nil {
c.Stdout = ioutil.Discard
}
verboseStdout := ioutil.Discard
if c.VerboseLogging {
verboseStdout = c.Stdout
}
log := logger.New(c.Stdout)
boltClient := bolt.NewClient(log.With(zap.String("service", "bolt")))
boltClient.Path = filepath.Join(c.DestPath, "influxd.bolt")
store := bolt.NewKVStore(
log.With(zap.String("service", "kvstore-bolt")),
filepath.Join(c.DestPath, "influxd.bolt"),
)
store.WithDB(boltClient.DB())
if err := store.Open(context.Background()); err != nil {
panic(err)
}
metaSvc := kv.NewService(log.With(zap.String("store", "kv")), store)
// Update the destination path - we only care about the tsm store now.
c.DestPath = filepath.Join(c.DestPath, "engine", "data")
return &Migrator{Config: c, store: store, metaSvc: metaSvc, verboseStdout: verboseStdout}
}
// shardMapping provides a mapping between a 1.x shard and a bucket in 2.x
type shardMapping struct {
path string
bucketID influxdb.ID
}
// Process1xShards migrates the contents of any matching 1.x shards.
//
// The caller can filter shards only belonging to a retention policy and database.
// Providing the zero value for the filters will result in all shards being
// migrated, with the exception of the `_internal` database, which is never
// migrated unless explicitly filtered on.
func (m *Migrator) Process1xShards(dbFilter, rpFilter string) error {
defer m.store.Close()
// determine current gen
fs := tsm1.NewFileStore(m.DestPath)
if err := fs.Open(context.Background()); err != nil {
return err
}
m.current2xTSMGen = fs.NextGeneration()
fs.Close()
var (
toProcessShards []shardMapping
curDB, curRP string // track current db and rp
bucketID influxdb.ID // track current bucket ID
)
err := walkShardDirs(filepath.Join(m.SourcePath, dataDirName1x), func(db string, rp string, path string) error {
if dbFilter == "" && db == internalDBName1x {
return nil // Don't import TSM data from _internal unless explicitly instructed to
}
// A database or retention policy filter has been specified and this
// shard path does not match it.
if (dbFilter != "" && db != dbFilter) || (rpFilter != "" && rp != rpFilter) {
return nil
}
var err error
if db != curDB || rp != curRP {
if bucketID, err = m.createBucket(db, rp); err != nil {
return err
}
curDB, curRP = db, rp
}
toProcessShards = append(toProcessShards, shardMapping{path: path, bucketID: bucketID})
return nil
})
if err != nil {
return err
}
// Sort shards so that for each database and retention policy, we deal handle
// them in the order they were created.
sortShardDirs(toProcessShards)
for _, shard := range toProcessShards {
now := time.Now()
if err := m.Process1xShard(shard.path, shard.bucketID); err != nil {
return err
}
fmt.Fprintf(m.Stdout, "Migrated shard %s to bucket %s in %v\n", shard.path, shard.bucketID.String(), time.Since(now))
}
fmt.Fprintln(m.Stdout, "Building TSI index")
sfilePath := filepath.Join(filepath.Dir(m.DestPath), storage.DefaultSeriesFileDirectoryName)
sfile := seriesfile.NewSeriesFile(sfilePath)
sfile.Logger = logger.New(m.verboseStdout)
if err := sfile.Open(context.Background()); err != nil {
return err
}
defer sfile.Close()
indexPath := filepath.Join(filepath.Dir(m.DestPath), storage.DefaultIndexDirectoryName)
// Check if TSI index exists.
if _, err = os.Stat(indexPath); err == nil {
if m.DryRun {
fmt.Fprintf(m.Stdout, "Would remove index located at %q\n", indexPath)
} else if err := os.RemoveAll(indexPath); err != nil { // Remove the index
return err
} else {
fmt.Fprintf(m.Stdout, "Removed existing TSI index at %q\n", indexPath)
}
} else if !os.IsNotExist(err) {
return err
}
if m.DryRun {
fmt.Fprintf(m.Stdout, "Would rebuild index at %q\n", indexPath)
return nil
}
walPath := filepath.Join(filepath.Dir(m.DestPath), storage.DefaultWALDirectoryName)
err = buildtsi.IndexShard(sfile, indexPath, m.DestPath, walPath,
tsi1.DefaultMaxIndexLogFileSize, uint64(tsm1.DefaultCacheMaxMemorySize),
10000, logger.New(m.verboseStdout), false)
if err != nil {
msg := fmt.Sprintf(`
**ERROR** - TSI index rebuild failed.
The index has potentially been left in an unrecoverable state. Indexes can be rebuilt
using the 'influxd inspect build-tsi' command.
Step 1: remove TSI index with '$ rm -rf %s'
Step 2: run '$ influxd inspect build-tsi'
Original error: %v
`, indexPath, err)
return errors.New(msg)
}
return nil
}
// sortShardDirs sorts shard directories in lexicographical order according to
// database and retention policy. Shards within the same database and
// retention policy are sorted numerically by shard id.
func sortShardDirs(shards []shardMapping) error {
var err2 error
sort.Slice(shards, func(i, j int) bool {
iDir := filepath.Dir(shards[i].path)
jDir := filepath.Dir(shards[j].path)
if iDir != jDir {
return iDir < jDir // db or rp differ
}
// Same db and rp. Sort on shard id.
iID, err := strconv.Atoi(filepath.Base(shards[i].path))
if err != nil {
err2 = err
return false
}
jID, err := strconv.Atoi(filepath.Base(shards[j].path))
if err != nil {
err2 = err
return false
}
return iID < jID
})
return err2
}
func (m *Migrator) createBucket(db, rp string) (influxdb.ID, error) {
name := filepath.Join(db, rp)
bucket, err := m.metaSvc.FindBucketByName(context.Background(), m.DestOrg, name)
if err != nil {
innerErr, ok := err.(*influxdb.Error)
if !ok || innerErr.Code != influxdb.ENotFound {
return 0, err
}
} else if bucket != nil {
// Ignore an error returned from being unable to find a bucket.
fmt.Fprintf(m.verboseStdout, "Bucket %q already exists with ID %s\n", name, bucket.ID.String())
return bucket.ID, nil
}
if !m.DryRun {
bucket = &influxdb.Bucket{
Name: name,
OrgID: m.DestOrg,
}
if err := m.metaSvc.CreateBucket(context.Background(), bucket); err != nil {
return 0, err
}
fmt.Fprintf(m.verboseStdout, "Created bucket %q with ID %s\n", name, bucket.ID.String())
} else {
fmt.Fprintf(m.Stdout, "Would create bucket %q\n", name)
}
return bucket.ID, nil
}
// Process1xShard migrates the TSM data in a single 1.x shard to the 2.x data directory.
//
// First, the shard is checked to determine it's fully compacted. Hot shards are
// not migrated by default as the WAL is not processed, which could lead to data
// loss. Next, each TSM file contents is checked to ensure it overlaps the
// desired time-range, and all matching data is migrated.
//
func (m *Migrator) Process1xShard(pth string, bucketID influxdb.ID) error {
// * Check full compaction
// * Stream TSM file into new TSM file
// - full blocks can be copied over if the time range overlaps.
// - partial blocks need to be decoded and written out up to the timestamp.
// - Index needs to include any entries that have at least one block overlapping
// the time range.
//
// TODO(edd): strategy for detecting hot shard - need to check for any
// existence of WAL files.
//
// Check for `tmp` files and identify TSM file(s) path.
var tsmPaths []string // Possible a fully compacted shard has multiple TSM files.
filepath.Walk(pth, func(p string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if strings.HasSuffix(p, fmt.Sprintf(".%s.%s", tsm1.TSMFileExtension, tsm1.CompactionTempExtension)) {
return fmt.Errorf("tmp TSM file detected at %q — aborting shard import", p)
} else if ext := filepath.Ext(p); ext == "."+tsm1.TSMFileExtension {
tsmPaths = append(tsmPaths, p)
}
// All other non-tsm shard contents are skipped.
return nil
})
if len(tsmPaths) == 0 {
return fmt.Errorf("no tsm data found at %q", pth)
}
var processed bool
for _, tsmPath := range tsmPaths {
fd, err := os.Open(tsmPath)
if err != nil {
return err
}
r, err := tsm1.NewTSMReader(fd)
if err != nil {
fd.Close()
return err
}
tsmMin, tsmMax := r.TimeRange()
if !r.OverlapsTimeRange(m.From, m.To) {
fmt.Fprintf(m.verboseStdout, "Skipping out-of-range (min-time: %v, max-time: %v) TSM file at path %q\n",
time.Unix(0, tsmMin), time.Unix(0, tsmMax), tsmPath)
if err := r.Close(); err != nil {
return err
}
fd.Close()
continue
}
processed = true // the generation needs to be incremented
now := time.Now()
// Entire TSM file is within the imported time range; copy all block data
// and rewrite TSM index.
if tsmMin >= m.From && tsmMax <= m.To {
if err := m.processTSMFileFast(r, fd, bucketID); err != nil {
r.Close()
fd.Close() // flushes buffer before close
return fmt.Errorf("error processing TSM file %q: %v", tsmPath, err)
}
if err := r.Close(); err != nil {
return err
}
continue
}
if err := m.processTSMFile(r); err != nil {
r.Close()
return fmt.Errorf("error processing TSM file %q: %v", tsmPath, err)
}
fmt.Fprintf(m.verboseStdout, "Processed TSM file: %s in %v\n", tsmPath, time.Since(now))
if err := r.Close(); err != nil {
return err
}
}
// Before returning we need to increase the generation to map the next shard
// and ensure the TSM files don't clash with this one.
if processed {
// Determine how much to move increase the generation by looking at the
// number of generations in the shard.
minGen, _, err := tsm1.DefaultParseFileName(tsmPaths[0])
if err != nil {
return err
}
maxGen, _, err := tsm1.DefaultParseFileName(tsmPaths[len(tsmPaths)-1])
if err != nil {
return err
}
m.current2xTSMGen += maxGen - minGen + 1
}
return nil
}
func (m *Migrator) processTSMFile(r *tsm1.TSMReader) error {
// TODO - support processing a partial TSM file.
//
// 0) Figure out destination TSM filename - see processTSMFileFast for how to do that.
// 1) For each block in the file - check the min/max time on the block (using the TSM index) overlap;
// 2) If they overlap completely then you can write the entire block (easy);
// 3) Otherwise, decompress the block and scan the timestamps - reject the portion(s) of the block that don't overlap;
// 4) Compress the new block back up and write it out
// 5) Re-sort the TSM index, removing any entries where you rejected the entire block. (sort1xTSMKeys will sort the keys properly for you).
panic("not yet implemented")
}
// processTSMFileFast processes all blocks in the provided TSM file, because all
// TSM data in the file is within the time range being imported.
func (m *Migrator) processTSMFileFast(r *tsm1.TSMReader, fi *os.File, bucketID influxdb.ID) (err error) {
gen, seq, err := tsm1.DefaultParseFileName(r.Path())
if err != nil {
return err
}
name := tsm1.DefaultFormatFileName(m.current2xTSMGen+gen-1, seq)
newPath := filepath.Join(m.DestPath, name+"."+tsm1.TSMFileExtension+importTempExtension)
if m.DryRun {
fmt.Fprintf(m.Stdout, "Migrating %s --> %s\n", r.Path(), newPath)
return nil
}
fo, err := writeCloser(r.Path(), newPath)
if err != nil {
return err
}
// If there is no error writing the file then remove the .tmp extension.
defer func() {
fo.Close()
if err == nil {
// Rename import file.
finalPath := strings.TrimSuffix(newPath, importTempExtension)
if err2 := fs.RenameFile(newPath, finalPath); err2 != nil {
err = err2
return
}
fmt.Fprintf(m.Stdout, "Migrated %s --> %s\n", r.Path(), finalPath)
}
}()
// Determine end of block by reading index offset.
indexOffset, err := indexOffset(fi)
if err != nil {
return err
}
// Return to beginning of file and copy the header and all block data to
// new file.
if _, err = fi.Seek(0, io.SeekStart); err != nil {
return err
}
n, err := io.CopyN(fo, fi, int64(indexOffset))
if err != nil {
return err
} else if n != int64(indexOffset) {
return fmt.Errorf("short read of block data. Read %d/%d bytes", n, indexOffset)
}
// Gather keys - need to materialise them all because they have to be re-sorted
keys := make([][]byte, 0, 1000)
itr := r.Iterator(nil)
for itr.Next() {
keys = append(keys, itr.Key())
}
if itr.Err() != nil {
return itr.Err()
}
// Sort 1.x TSM keys according to their new 2.x values.
// Don't allocate the new keys though, otherwise you're doubling the heap
// requirements for this file's index, which could be ~2GB * 2.
sort1xTSMKeys(keys)
// Rewrite TSM index into new file.
var tagsBuf models.Tags // Buffer to use for each series.
var oldM []byte
var seriesKeyBuf []byte // Buffer to use for new series key.
var entriesBuf []tsm1.IndexEntry
newM := tsdb.EncodeName(m.DestOrg, bucketID)
for _, tsmKey := range keys {
sKey1x, fKey := tsm1.SeriesAndFieldFromCompositeKey(tsmKey)
oldM, tagsBuf = models.ParseKeyBytesWithTags(sKey1x, tagsBuf)
// Rewrite the measurement and tags.
sKey2x := rewriteSeriesKey(oldM, newM[:], fKey, tagsBuf, seriesKeyBuf[:0])
// The key is not in a TSM format. Convert it to TSM format.
sKey2x = append(sKey2x, tsmKeyFieldSeparator1xBytes...)
sKey2x = append(sKey2x, fKey...)
// Write the entries for the key back into new file.
if entriesBuf, err = r.ReadEntries(tsmKey, entriesBuf[:0]); err != nil {
return fmt.Errorf("unable to get entries for key %q. Error: %v", tsmKey, err)
}
typ, err := r.Type(tsmKey) // TODO(edd): could capture type during previous iterator out of this loop
if err != nil {
return fmt.Errorf("unable to get type for key %q. Error: %v", tsmKey, err)
}
if err := writeIndexEntries(fo, sKey2x, typ, entriesBuf); err != nil {
return err
}
}
// Write Footer.
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], indexOffset)
_, err = fo.Write(buf[:])
return err
}
var (
sortTSMKeysBufFirst []byte
sortTSMKeysBufSecond []byte
)
// sort1xTSMKeys sorts 1.x TSM keys lexicographically as if they were 2.x TSM keys.
//
// It is not safe to call sort1xTSMKeys concurrently because it uses shared
// buffers to reduce allocations.
func sort1xTSMKeys(keys [][]byte) {
sort.SliceStable(keys, func(i, j int) bool {
firstCutIdx := bytes.Index(keys[i], tsmKeyFieldSeparator1xBytes)
secondCutIdx := bytes.Index(keys[j], tsmKeyFieldSeparator1xBytes)
if cap(sortTSMKeysBufFirst) < firstCutIdx+1 {
sortTSMKeysBufFirst = append(sortTSMKeysBufFirst, make([]byte, firstCutIdx-len(sortTSMKeysBufFirst)+1)...)
}
sortTSMKeysBufFirst = sortTSMKeysBufFirst[:firstCutIdx+1]
copy(sortTSMKeysBufFirst, keys[i][:firstCutIdx])
sortTSMKeysBufFirst[len(sortTSMKeysBufFirst)-1] = ','
if cap(sortTSMKeysBufSecond) < secondCutIdx+1 {
sortTSMKeysBufSecond = append(sortTSMKeysBufSecond, make([]byte, secondCutIdx-len(sortTSMKeysBufSecond)+1)...)
}
sortTSMKeysBufSecond = sortTSMKeysBufSecond[:secondCutIdx+1]
copy(sortTSMKeysBufSecond, keys[j][:secondCutIdx])
sortTSMKeysBufSecond[len(sortTSMKeysBufSecond)-1] = ','
return bytes.Compare(
append(append(sortTSMKeysBufFirst, models.FieldKeyTagKeyBytes...), keys[i][firstCutIdx+len(tsmKeyFieldSeparator1x):]...),
append(append(sortTSMKeysBufSecond, models.FieldKeyTagKeyBytes...), keys[j][secondCutIdx+len(tsmKeyFieldSeparator1x):]...),
) < 0
})
}
var tsmKeyFieldSeparator1xBytes = []byte(tsmKeyFieldSeparator1x)
func writeIndexEntries(w io.Writer, key []byte, typ byte, entries []tsm1.IndexEntry) error {
var buf [5 + tsmIndexEntrySize1x]byte
binary.BigEndian.PutUint16(buf[0:2], uint16(len(key)))
buf[2] = typ
binary.BigEndian.PutUint16(buf[3:5], uint16(len(entries)))
// Write the key length.
if _, err := w.Write(buf[0:2]); err != nil {
return fmt.Errorf("write: writer key length error: %v", err)
}
// Write the key.
if _, err := w.Write(key); err != nil {
return fmt.Errorf("write: writer key error: %v", err)
}
// Write the block type and count
if _, err := w.Write(buf[2:5]); err != nil {
return fmt.Errorf("write: writer block type and count error: %v", err)
}
// Write each index entry for all blocks for this key
for _, entry := range entries {
entry.AppendTo(buf[5:])
n, err := w.Write(buf[5:])
if err != nil {
return err
} else if n != tsmIndexEntrySize1x {
return fmt.Errorf("incorrect number of bytes written for entry: %d", n)
}
}
return nil
}
// rewriteSeriesKey takes a 1.x tsm series key and rewrites it to
// a 2.x format by including the `_m`, `_f` tag pairs and a new measurement
// comprising the org/bucket id.
func rewriteSeriesKey(oldM, newM []byte, fkey []byte, tags models.Tags, buf []byte) []byte {
// Add the `_f` and `_m` tags.
tags = append(tags, models.Tag{}, models.Tag{}) // Make room for two new tags.
copy(tags[1:], tags) // Copy existing tags down.
tags[0] = models.NewTag(models.MeasurementTagKeyBytes, oldM)
tags[len(tags)-1] = models.NewTag(models.FieldKeyTagKeyBytes, fkey)
// Create a new series key using the new measurement name and tags.
return models.AppendMakeKey(buf, newM, tags)
}
func walkShardDirs(root string, fn func(db, rp, path string) error) error {
type location struct {
db, rp, path string
id int
}
dirs := map[string]location{}
if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
if filepath.Ext(info.Name()) == "."+tsm1.TSMFileExtension {
shardDir := filepath.Dir(path)
id, err := strconv.Atoi(filepath.Base(shardDir))
if err != nil || id < 1 {
return fmt.Errorf("not a valid shard dir: %v", shardDir)
}
absPath, err := filepath.Abs(path)
if err != nil {
return err
}
parts := strings.Split(absPath, string(filepath.Separator))
db, rp := parts[len(parts)-4], parts[len(parts)-3]
dirs[shardDir] = location{db: db, rp: rp, id: id, path: shardDir}
return nil
}
return nil
}); err != nil {
return err
}
dirsSlice := make([]location, 0, len(dirs))
for _, v := range dirs {
dirsSlice = append(dirsSlice, v)
}
sort.Slice(dirsSlice, func(i, j int) bool {
return dirsSlice[i].id < dirsSlice[j].id
})
for _, shard := range dirs {
if err := fn(shard.db, shard.rp, shard.path); err != nil {
return err
}
}
return nil
}
// writeCloser initialises an io.WriteCloser for writing a new TSM file.
func writeCloser(src, dst string) (io.WriteCloser, error) {
fd, err := os.Create(dst)
if err != nil {
return nil, err
}
w := bufio.NewWriterSize(fd, 1<<20)
return w, nil
}
// indexOffset returns the offset to the TSM index of the provided file, which
// must be a valid TSM file.
func indexOffset(fd *os.File) (uint64, error) {
_, err := fd.Seek(-8, io.SeekEnd)
if err != nil {
return 0, err
}
buf := make([]byte, 8)
n, err := fd.Read(buf)
if err != nil {
return 0, err
} else if n != 8 {
return 0, fmt.Errorf("short read of index offset on file %q", fd.Name())
}
return binary.BigEndian.Uint64(buf), nil
}

View File

@ -0,0 +1,110 @@
package migrate
import (
"reflect"
"testing"
"github.com/influxdata/influxdb/pkg/slices"
)
func Test_sortShardDirs(t *testing.T) {
input := []shardMapping{
{path: "/influxdb/data/db0/autogen/0"},
{path: "/influxdb/data/db0/rp0/10"},
{path: "/influxdb/data/db0/autogen/10"},
{path: "/influxdb/data/db0/autogen/2"},
{path: "/influxdb/data/db0/autogen/43"},
{path: "/influxdb/data/apple/rp1/99"},
{path: "/influxdb/data/apple/rp2/0"},
{path: "/influxdb/data/db0/autogen/33"},
}
expected := []shardMapping{
{path: "/influxdb/data/apple/rp1/99"},
{path: "/influxdb/data/apple/rp2/0"},
{path: "/influxdb/data/db0/autogen/0"},
{path: "/influxdb/data/db0/autogen/2"},
{path: "/influxdb/data/db0/autogen/10"},
{path: "/influxdb/data/db0/autogen/33"},
{path: "/influxdb/data/db0/autogen/43"},
{path: "/influxdb/data/db0/rp0/10"},
}
if err := sortShardDirs(input); err != nil {
t.Fatal(err)
}
if got, exp := input, expected; !reflect.DeepEqual(got, exp) {
t.Fatalf("got %v, expected %v", got, expected)
}
input = append(input, shardMapping{path: "/influxdb/data/db0/rp0/badformat"})
if err := sortShardDirs(input); err == nil {
t.Fatal("expected error, got <nil>")
}
}
var sep = tsmKeyFieldSeparator1x
func Test_sort1xTSMKeys(t *testing.T) {
cases := []struct {
input [][]byte
expected [][]byte
}{
{
input: slices.StringsToBytes(
"cpu"+sep+"a",
"cpu"+sep+"b",
"cpu"+sep+"c",
"disk"+sep+"a",
),
expected: slices.StringsToBytes(
"cpu"+sep+"a",
"cpu"+sep+"b",
"cpu"+sep+"c",
"disk"+sep+"a",
),
},
{
input: slices.StringsToBytes(
"cpu"+sep+"c",
"cpu,region=east"+sep+"b",
"cpu,region=east,server=a"+sep+"a",
),
expected: slices.StringsToBytes(
"cpu,region=east,server=a"+sep+"a",
"cpu,region=east"+sep+"b",
"cpu"+sep+"c",
),
},
{
input: slices.StringsToBytes(
"cpu"+sep+"c",
"cpu,region=east"+sep+"b",
"cpu,region=east,server=a"+sep+"a",
),
expected: slices.StringsToBytes(
"cpu,region=east,server=a"+sep+"a",
"cpu,region=east"+sep+"b",
"cpu"+sep+"c",
),
},
{
input: slices.StringsToBytes(
"\xc1\xbd\xd5)x!\a#H\xd4\xf3ç\xde\v\x14,\x00=m0,tag0=value1#!~#v0",
"\xc1\xbd\xd5)x!\a#H\xd4\xf3ç\xde\v\x14,\x00=m0,tag0=value19,tag1=value999,tag2=value9,tag3=value0#!~#v0",
),
expected: slices.StringsToBytes(
"\xc1\xbd\xd5)x!\a#H\xd4\xf3ç\xde\v\x14,\x00=m0,tag0=value1"+sep+"v0",
"\xc1\xbd\xd5)x!\a#H\xd4\xf3ç\xde\v\x14,\x00=m0,tag0=value19,tag1=value999,tag2=value9,tag3=value0"+sep+"v0",
),
},
}
for _, tc := range cases {
sort1xTSMKeys(tc.input)
if got, exp := tc.input, tc.expected; !reflect.DeepEqual(got, exp) {
t.Errorf("got %s, expected %s", got, exp)
}
}
}