fix(influx-tools): Reimplement compact-shard using tsm1.Compactor
* utilizes `tsm1.Compactor#CompactFull` to fully compact the specified shard * the WAL is unmodified * added `-verbose` option to show progress as TSM files are openedpull/10029/merge
parent
0887b38a65
commit
8c2ff02ede
|
@ -2,24 +2,25 @@ package compact
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"bytes"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/cmd/influx_tools/internal/errlist"
|
||||
"github.com/influxdata/influxdb/cmd/influx_tools/internal/format/binary"
|
||||
"github.com/influxdata/influxdb/cmd/influx_tools/internal/format/line"
|
||||
"github.com/influxdata/influxdb/cmd/influx_tools/internal/shard"
|
||||
"github.com/influxdata/influxdb/cmd/influx_tools/internal/storage"
|
||||
"github.com/influxdata/influxdb/cmd/influx_tools/server"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"github.com/influxdata/influxdb/logger"
|
||||
"github.com/influxdata/influxdb/pkg/limiter"
|
||||
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -39,11 +40,9 @@ type Command struct {
|
|||
server server.Interface
|
||||
store *storage.Store
|
||||
|
||||
configPath string
|
||||
database string
|
||||
rp string
|
||||
shardID uint64
|
||||
force bool
|
||||
path string
|
||||
force bool
|
||||
verbose bool
|
||||
}
|
||||
|
||||
// NewCommand returns a new instance of the export Command.
|
||||
|
@ -62,39 +61,26 @@ func (cmd *Command) Run(args []string) (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
err = cmd.server.Open(cmd.configPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cmd.server.Close()
|
||||
|
||||
err = cmd.openStore()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cmd.closeStore()
|
||||
|
||||
if sh := cmd.getShard(); sh == nil {
|
||||
return fmt.Errorf("shard %d does not exist", cmd.shardID)
|
||||
} else if sh.IsIdle() {
|
||||
fmt.Printf("shard %d is fully compacted\n", cmd.shardID)
|
||||
return nil
|
||||
var log = zap.NewNop()
|
||||
if cmd.verbose {
|
||||
cfg := logger.Config{Format: "logfmt"}
|
||||
log, err = cfg.New(os.Stdout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
dataPath := filepath.Join(cmd.server.TSDBConfig().Dir, cmd.database, cmd.rp)
|
||||
shardDataPath := filepath.Join(dataPath, strconv.Itoa(int(cmd.shardID)))
|
||||
walPath := filepath.Join(cmd.server.TSDBConfig().WALDir, cmd.database, cmd.rp)
|
||||
shardWalPath := filepath.Join(walPath, strconv.Itoa(int(cmd.shardID)))
|
||||
fmt.Printf("opening shard at path %q\n\n", cmd.path)
|
||||
|
||||
// snapshot existing of shard files (tsm, tombstone and wal)
|
||||
files, err := newShardFiles(shardDataPath, shardWalPath)
|
||||
sc, err := newShardCompactor(cmd.path, log)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Println()
|
||||
fmt.Println("The following files will be compacted:")
|
||||
fmt.Println()
|
||||
fmt.Println(files.String())
|
||||
fmt.Println(sc.String())
|
||||
|
||||
if !cmd.force {
|
||||
fmt.Print("Proceed? [N] ")
|
||||
|
@ -109,148 +95,15 @@ func (cmd *Command) Run(args []string) (err error) {
|
|||
}
|
||||
}
|
||||
|
||||
gen, seq := 1, 4
|
||||
if len(files.tsm) > 0 {
|
||||
sort.Strings(files.tsm)
|
||||
gen, _, err = tsm1.DefaultParseFileName(files.tsm[len(files.tsm)-1])
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse tsm file %q: %v", files.tsm[len(files.tsm)-1], err)
|
||||
}
|
||||
gen++
|
||||
}
|
||||
fmt.Println("Compacting shard.")
|
||||
|
||||
rs, err := cmd.read()
|
||||
err = sc.CompactShard()
|
||||
if err != nil {
|
||||
return fmt.Errorf("read error: %v\n", err)
|
||||
}
|
||||
|
||||
if rs == nil {
|
||||
fmt.Printf("no data to read")
|
||||
return nil
|
||||
}
|
||||
|
||||
sw := shard.NewWriter(cmd.shardID, dataPath, shard.Temporary(), shard.Generation(gen), shard.Sequence(seq))
|
||||
|
||||
var countSeries, countValues int
|
||||
|
||||
values := make(tsm1.Values, 1000) // max block size
|
||||
var keyFieldSeparatorBytes = []byte("#!~#")
|
||||
|
||||
makeKey := func(name []byte, tags models.Tags, field []byte) []byte {
|
||||
sz := 0 +
|
||||
len(name) +
|
||||
1 + // name delimiter
|
||||
tags.Size() + // total size of tags in bytes
|
||||
len(tags) - 1 + // tag delimiters
|
||||
len(keyFieldSeparatorBytes) +
|
||||
len(field)
|
||||
|
||||
key := make([]byte, sz)
|
||||
models.AppendMakeKey(key, name, tags)
|
||||
key = append(key, keyFieldSeparatorBytes...)
|
||||
key = append(key, field...)
|
||||
return key
|
||||
}
|
||||
|
||||
for rs.Next() {
|
||||
countSeries++
|
||||
seriesKey := makeKey(rs.Name(), rs.Tags(), rs.Field())
|
||||
ci := rs.CursorIterator()
|
||||
|
||||
for ci.Next() {
|
||||
cur := ci.Cursor()
|
||||
switch c := cur.(type) {
|
||||
case tsdb.IntegerBatchCursor:
|
||||
for {
|
||||
keys, vals := c.Next()
|
||||
if len(keys) == 0 {
|
||||
break
|
||||
}
|
||||
countValues += len(keys)
|
||||
for i, k := range keys {
|
||||
values[i] = tsm1.NewIntegerValue(k, vals[i])
|
||||
}
|
||||
sw.Write(seriesKey, values[:len(keys)])
|
||||
}
|
||||
case tsdb.FloatBatchCursor:
|
||||
for {
|
||||
keys, vals := c.Next()
|
||||
if len(keys) == 0 {
|
||||
break
|
||||
}
|
||||
countValues += len(keys)
|
||||
for i, k := range keys {
|
||||
values[i] = tsm1.NewFloatValue(k, vals[i])
|
||||
}
|
||||
sw.Write(seriesKey, values[:len(keys)])
|
||||
}
|
||||
case tsdb.UnsignedBatchCursor:
|
||||
for {
|
||||
keys, vals := c.Next()
|
||||
if len(keys) == 0 {
|
||||
break
|
||||
}
|
||||
countValues += len(keys)
|
||||
for i, k := range keys {
|
||||
values[i] = tsm1.NewUnsignedValue(k, vals[i])
|
||||
}
|
||||
sw.Write(seriesKey, values[:len(keys)])
|
||||
}
|
||||
case tsdb.BooleanBatchCursor:
|
||||
for {
|
||||
keys, vals := c.Next()
|
||||
if len(keys) == 0 {
|
||||
break
|
||||
}
|
||||
countValues += len(keys)
|
||||
for i, k := range keys {
|
||||
values[i] = tsm1.NewBooleanValue(k, vals[i])
|
||||
}
|
||||
sw.Write(seriesKey, values[:len(keys)])
|
||||
}
|
||||
case tsdb.StringBatchCursor:
|
||||
for {
|
||||
keys, vals := c.Next()
|
||||
if len(keys) == 0 {
|
||||
break
|
||||
}
|
||||
countValues += len(keys)
|
||||
for i, k := range keys {
|
||||
values[i] = tsm1.NewStringValue(k, vals[i])
|
||||
}
|
||||
sw.Write(seriesKey, values[:len(keys)])
|
||||
}
|
||||
case nil:
|
||||
// no data for series key + field combination in this shard
|
||||
continue
|
||||
default:
|
||||
panic(fmt.Sprintf("unreachable: %T", c))
|
||||
}
|
||||
cur.Close()
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("processed %d series, %d values\n", countSeries, countValues)
|
||||
|
||||
rs.Close()
|
||||
sw.Close()
|
||||
if sw.Err() != nil {
|
||||
for _, f := range sw.Files() {
|
||||
os.Remove(f)
|
||||
}
|
||||
return sw.Err()
|
||||
}
|
||||
|
||||
cmd.closeStore() // close TSDB store to release files
|
||||
|
||||
newFiles, err := files.replace(sw.Files())
|
||||
if err != nil {
|
||||
fmt.Printf("Compaction failed: unable to replace files\n%v", err)
|
||||
return errors.New("unable to replace files")
|
||||
return fmt.Errorf("compaction failed: %v", err)
|
||||
}
|
||||
|
||||
fmt.Println("Compaction succeeded. New files:")
|
||||
for _, f := range newFiles {
|
||||
for _, f := range sc.newTSM {
|
||||
fmt.Printf(" %s\n", f)
|
||||
}
|
||||
|
||||
|
@ -259,81 +112,205 @@ func (cmd *Command) Run(args []string) (err error) {
|
|||
|
||||
func (cmd *Command) parseFlags(args []string) error {
|
||||
fs := flag.NewFlagSet("compact-shard", flag.ContinueOnError)
|
||||
fs.StringVar(&cmd.configPath, "config", "", "Config file")
|
||||
fs.StringVar(&cmd.database, "database", "", "Database name")
|
||||
fs.StringVar(&cmd.rp, "rp", "", "Retention policy name")
|
||||
fs.Uint64Var(&cmd.shardID, "shard-id", 0, "Shard ID to compact")
|
||||
fs.StringVar(&cmd.path, "path", "", "path of shard to be compacted")
|
||||
fs.BoolVar(&cmd.force, "force", false, "Force compaction without prompting")
|
||||
fs.BoolVar(&cmd.verbose, "verbose", false, "Enable verbose logging")
|
||||
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if cmd.database == "" {
|
||||
return errors.New("database is required")
|
||||
}
|
||||
|
||||
if cmd.rp == "" {
|
||||
return errors.New("rp is required")
|
||||
}
|
||||
|
||||
if cmd.shardID == 0 {
|
||||
return errors.New("shard-id is required")
|
||||
if cmd.path == "" {
|
||||
return errors.New("shard-path is required")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cmd *Command) openStore() error {
|
||||
store := tsdb.NewStore(cmd.server.TSDBConfig().Dir)
|
||||
store.EngineOptions.MonitorDisabled = true
|
||||
store.EngineOptions.CompactionDisabled = true
|
||||
store.EngineOptions.Config = cmd.server.TSDBConfig()
|
||||
store.EngineOptions.EngineVersion = cmd.server.TSDBConfig().Engine
|
||||
store.EngineOptions.IndexVersion = cmd.server.TSDBConfig().Index
|
||||
store.EngineOptions.DatabaseFilter = func(database string) bool {
|
||||
return database == cmd.database
|
||||
}
|
||||
store.EngineOptions.RetentionPolicyFilter = func(_, rp string) bool {
|
||||
return rp == cmd.rp
|
||||
}
|
||||
store.EngineOptions.ShardFilter = func(_, _ string, id uint64) bool {
|
||||
return id == cmd.shardID
|
||||
type shardCompactor struct {
|
||||
logger *zap.Logger
|
||||
path string
|
||||
tsm []string
|
||||
tombstone []string
|
||||
readers []*tsm1.TSMReader
|
||||
files map[string]*tsm1.TSMReader
|
||||
newTSM []string
|
||||
}
|
||||
|
||||
func newShardCompactor(path string, logger *zap.Logger) (fs *shardCompactor, err error) {
|
||||
fs = &shardCompactor{
|
||||
logger: logger,
|
||||
path: path,
|
||||
files: make(map[string]*tsm1.TSMReader),
|
||||
}
|
||||
|
||||
if err := store.Open(); err != nil {
|
||||
return err
|
||||
fs.tsm, err = filepath.Glob(filepath.Join(path, fmt.Sprintf("*.%s", tsm1.TSMFileExtension)))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("newFileStore: error reading tsm files at path %q: %v", path, err)
|
||||
}
|
||||
if len(fs.tsm) == 0 {
|
||||
return nil, fmt.Errorf("newFileStore: no tsm files at path %q", path)
|
||||
}
|
||||
sort.Strings(fs.tsm)
|
||||
|
||||
fs.tombstone, err = filepath.Glob(filepath.Join(path, fmt.Sprintf("*.%s", "tombstone")))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error reading tombstone files: %v", err)
|
||||
}
|
||||
|
||||
cmd.store = &storage.Store{TSDBStore: store}
|
||||
fs.readers = make([]*tsm1.TSMReader, 0, len(fs.tsm))
|
||||
err = fs.openFiles()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return fs, nil
|
||||
}
|
||||
|
||||
func (sc *shardCompactor) openFiles() error {
|
||||
// struct to hold the result of opening each reader in a goroutine
|
||||
type res struct {
|
||||
r *tsm1.TSMReader
|
||||
err error
|
||||
}
|
||||
|
||||
lim := limiter.NewFixed(runtime.GOMAXPROCS(0))
|
||||
|
||||
readerC := make(chan *res)
|
||||
for i, fn := range sc.tsm {
|
||||
file, err := os.OpenFile(fn, os.O_RDONLY, 0666)
|
||||
if err != nil {
|
||||
return fmt.Errorf("newFileStore: failed to open file %q: %v", fn, err)
|
||||
}
|
||||
|
||||
go func(idx int, file *os.File) {
|
||||
// Ensure a limited number of TSM files are loaded at once.
|
||||
// Systems which have very large datasets (1TB+) can have thousands
|
||||
// of TSM files which can cause extremely long load times.
|
||||
lim.Take()
|
||||
defer lim.Release()
|
||||
|
||||
start := time.Now()
|
||||
df, err := tsm1.NewTSMReader(file)
|
||||
sc.logger.Info("Opened file",
|
||||
zap.String("path", file.Name()),
|
||||
zap.Int("id", idx),
|
||||
zap.Duration("duration", time.Since(start)))
|
||||
|
||||
// If we are unable to read a TSM file then log the error, rename
|
||||
// the file, and continue loading the shard without it.
|
||||
if err != nil {
|
||||
sc.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
|
||||
if e := os.Rename(file.Name(), file.Name()+"."+tsm1.BadTSMFileExtension); e != nil {
|
||||
sc.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e))
|
||||
readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %v", file.Name(), e)}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
readerC <- &res{r: df}
|
||||
}(i, file)
|
||||
}
|
||||
|
||||
for range sc.tsm {
|
||||
res := <-readerC
|
||||
if res.err != nil {
|
||||
return res.err
|
||||
} else if res.r == nil {
|
||||
continue
|
||||
}
|
||||
sc.readers = append(sc.readers, res.r)
|
||||
sc.files[res.r.Path()] = res.r
|
||||
}
|
||||
close(readerC)
|
||||
sort.Sort(tsmReaders(sc.readers))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cmd *Command) closeStore() {
|
||||
if cmd.store != nil {
|
||||
cmd.store.TSDBStore.Close()
|
||||
cmd.store = nil
|
||||
func (sc *shardCompactor) CompactShard() (err error) {
|
||||
c := tsm1.NewCompactor()
|
||||
c.Dir = sc.path
|
||||
c.Size = tsm1.DefaultSegmentSize
|
||||
c.FileStore = sc
|
||||
c.Open()
|
||||
|
||||
tsmFiles, err := c.CompactFull(sc.tsm)
|
||||
if err == nil {
|
||||
sc.newTSM, err = sc.replace(tsmFiles)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (cmd *Command) getShard() *tsdb.Shard {
|
||||
return cmd.store.TSDBStore.Shard(cmd.shardID)
|
||||
}
|
||||
|
||||
func (cmd *Command) read() (*storage.ResultSet, error) {
|
||||
sh := cmd.getShard()
|
||||
if sh == nil {
|
||||
return nil, nil
|
||||
// replace replaces the existing shard files with temporary tsmFiles
|
||||
func (sc *shardCompactor) replace(tsmFiles []string) ([]string, error) {
|
||||
// rename .tsm.tmp → .tsm
|
||||
var newNames []string
|
||||
for _, file := range tsmFiles {
|
||||
var newName = file[:len(file)-4] // remove extension
|
||||
if err := os.Rename(file, newName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
newNames = append(newNames, newName)
|
||||
}
|
||||
|
||||
req := storage.ReadRequest{
|
||||
Database: cmd.database,
|
||||
RP: cmd.rp,
|
||||
Shards: []*tsdb.Shard{sh},
|
||||
Start: models.MinNanoTime,
|
||||
End: models.MaxNanoTime,
|
||||
var errs errlist.ErrorList
|
||||
|
||||
// close all readers
|
||||
for _, r := range sc.readers {
|
||||
r.Close()
|
||||
}
|
||||
|
||||
return cmd.store.Read(context.Background(), &req)
|
||||
sc.readers = nil
|
||||
sc.files = nil
|
||||
|
||||
// remove existing .tsm and .tombstone
|
||||
for _, file := range sc.tsm {
|
||||
errs.Add(os.Remove(file))
|
||||
}
|
||||
|
||||
for _, file := range sc.tombstone {
|
||||
errs.Add(os.Remove(file))
|
||||
}
|
||||
|
||||
return newNames, errs.Err()
|
||||
}
|
||||
|
||||
func (sc *shardCompactor) NextGeneration() int {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (sc *shardCompactor) TSMReader(path string) *tsm1.TSMReader {
|
||||
r := sc.files[path]
|
||||
if r != nil {
|
||||
r.Ref()
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func (sc *shardCompactor) String() string {
|
||||
var sb bytes.Buffer
|
||||
sb.WriteString("TSM:\n")
|
||||
for _, f := range sc.tsm {
|
||||
sb.WriteString(" ")
|
||||
sb.WriteString(f)
|
||||
sb.WriteByte('\n')
|
||||
}
|
||||
|
||||
if len(sc.tombstone) > 0 {
|
||||
sb.WriteString("\nTombstone:\n")
|
||||
for _, f := range sc.tombstone {
|
||||
sb.WriteString(" ")
|
||||
sb.WriteString(f)
|
||||
sb.WriteByte('\n')
|
||||
}
|
||||
}
|
||||
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
type tsmReaders []*tsm1.TSMReader
|
||||
|
||||
func (a tsmReaders) Len() int { return len(a) }
|
||||
func (a tsmReaders) Less(i, j int) bool { return a[i].Path() < a[j].Path() }
|
||||
func (a tsmReaders) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
|
|
@ -1,98 +0,0 @@
|
|||
package compact
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/influxdata/influxdb/cmd/influx-tools/internal/errlist"
|
||||
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
|
||||
)
|
||||
|
||||
type shardFiles struct {
|
||||
tsm []string
|
||||
tombstone []string
|
||||
wal []string
|
||||
}
|
||||
|
||||
func newShardFiles(dataPath, walPath string) (s *shardFiles, err error) {
|
||||
s = &shardFiles{}
|
||||
|
||||
s.tsm, err = filepath.Glob(filepath.Join(dataPath, fmt.Sprintf("*.%s", tsm1.TSMFileExtension)))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error reading tsm files: %v", err)
|
||||
}
|
||||
|
||||
s.tombstone, err = filepath.Glob(filepath.Join(dataPath, fmt.Sprintf("*.%s", "tombstone")))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error reading tombstone files: %v", err)
|
||||
}
|
||||
|
||||
s.wal, err = filepath.Glob(filepath.Join(walPath, fmt.Sprintf("*.%s", tsm1.WALFileExtension)))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error reading tombstone files: %v", err)
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// replace replaces the existing shard files with temporary tsmFiles
|
||||
func (s *shardFiles) replace(tsmFiles []string) ([]string, error) {
|
||||
// rename .tsm.tmp → .tsm
|
||||
var newNames []string
|
||||
for _, file := range tsmFiles {
|
||||
var newName = file[:len(file)-4] // remove extension
|
||||
if err := os.Rename(file, newName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
newNames = append(newNames, newName)
|
||||
}
|
||||
|
||||
var errs errlist.ErrorList
|
||||
|
||||
// remove existing .tsm, .tombstone and .wal files
|
||||
for _, file := range s.tsm {
|
||||
errs.Add(os.Remove(file))
|
||||
}
|
||||
|
||||
for _, file := range s.tombstone {
|
||||
errs.Add(os.Remove(file))
|
||||
}
|
||||
|
||||
for _, file := range s.wal {
|
||||
errs.Add(os.Remove(file))
|
||||
}
|
||||
|
||||
return newNames, errs.Err()
|
||||
}
|
||||
|
||||
func (s *shardFiles) String() string {
|
||||
var sb bytes.Buffer
|
||||
sb.WriteString("TSM:\n")
|
||||
for _, f := range s.tsm {
|
||||
sb.WriteString(" ")
|
||||
sb.WriteString(f)
|
||||
sb.WriteByte('\n')
|
||||
}
|
||||
|
||||
if len(s.tombstone) > 0 {
|
||||
sb.WriteString("\nTombstone:\n")
|
||||
for _, f := range s.tombstone {
|
||||
sb.WriteString(" ")
|
||||
sb.WriteString(f)
|
||||
sb.WriteByte('\n')
|
||||
}
|
||||
}
|
||||
|
||||
if len(s.wal) > 0 {
|
||||
sb.WriteString("\nWAL:\n")
|
||||
for _, f := range s.wal {
|
||||
sb.WriteString(" ")
|
||||
sb.WriteString(f)
|
||||
sb.WriteByte('\n')
|
||||
}
|
||||
}
|
||||
|
||||
return sb.String()
|
||||
}
|
Loading…
Reference in New Issue