diff --git a/cmd/influx_tools/compact/command.go b/cmd/influx_tools/compact/command.go index 1afc3ddc04..a63178fcce 100644 --- a/cmd/influx_tools/compact/command.go +++ b/cmd/influx_tools/compact/command.go @@ -2,24 +2,25 @@ package compact import ( "bufio" - "context" + "bytes" "errors" "flag" "fmt" "io" "os" "path/filepath" + "runtime" "sort" - "strconv" "strings" + "time" + "github.com/influxdata/influxdb/cmd/influx_tools/internal/errlist" "github.com/influxdata/influxdb/cmd/influx_tools/internal/format/binary" "github.com/influxdata/influxdb/cmd/influx_tools/internal/format/line" - "github.com/influxdata/influxdb/cmd/influx_tools/internal/shard" "github.com/influxdata/influxdb/cmd/influx_tools/internal/storage" "github.com/influxdata/influxdb/cmd/influx_tools/server" - "github.com/influxdata/influxdb/models" - "github.com/influxdata/influxdb/tsdb" + "github.com/influxdata/influxdb/logger" + "github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/tsdb/engine/tsm1" "go.uber.org/zap" ) @@ -39,11 +40,9 @@ type Command struct { server server.Interface store *storage.Store - configPath string - database string - rp string - shardID uint64 - force bool + path string + force bool + verbose bool } // NewCommand returns a new instance of the export Command. @@ -62,39 +61,26 @@ func (cmd *Command) Run(args []string) (err error) { return err } - err = cmd.server.Open(cmd.configPath) - if err != nil { - return err - } - defer cmd.server.Close() - - err = cmd.openStore() - if err != nil { - return err - } - defer cmd.closeStore() - - if sh := cmd.getShard(); sh == nil { - return fmt.Errorf("shard %d does not exist", cmd.shardID) - } else if sh.IsIdle() { - fmt.Printf("shard %d is fully compacted\n", cmd.shardID) - return nil + var log = zap.NewNop() + if cmd.verbose { + cfg := logger.Config{Format: "logfmt"} + log, err = cfg.New(os.Stdout) + if err != nil { + return err + } } - dataPath := filepath.Join(cmd.server.TSDBConfig().Dir, cmd.database, cmd.rp) - shardDataPath := filepath.Join(dataPath, strconv.Itoa(int(cmd.shardID))) - walPath := filepath.Join(cmd.server.TSDBConfig().WALDir, cmd.database, cmd.rp) - shardWalPath := filepath.Join(walPath, strconv.Itoa(int(cmd.shardID))) + fmt.Printf("opening shard at path %q\n\n", cmd.path) - // snapshot existing of shard files (tsm, tombstone and wal) - files, err := newShardFiles(shardDataPath, shardWalPath) + sc, err := newShardCompactor(cmd.path, log) if err != nil { return err } + fmt.Println() fmt.Println("The following files will be compacted:") fmt.Println() - fmt.Println(files.String()) + fmt.Println(sc.String()) if !cmd.force { fmt.Print("Proceed? [N] ") @@ -109,148 +95,15 @@ func (cmd *Command) Run(args []string) (err error) { } } - gen, seq := 1, 4 - if len(files.tsm) > 0 { - sort.Strings(files.tsm) - gen, _, err = tsm1.DefaultParseFileName(files.tsm[len(files.tsm)-1]) - if err != nil { - return fmt.Errorf("failed to parse tsm file %q: %v", files.tsm[len(files.tsm)-1], err) - } - gen++ - } + fmt.Println("Compacting shard.") - rs, err := cmd.read() + err = sc.CompactShard() if err != nil { - return fmt.Errorf("read error: %v\n", err) - } - - if rs == nil { - fmt.Printf("no data to read") - return nil - } - - sw := shard.NewWriter(cmd.shardID, dataPath, shard.Temporary(), shard.Generation(gen), shard.Sequence(seq)) - - var countSeries, countValues int - - values := make(tsm1.Values, 1000) // max block size - var keyFieldSeparatorBytes = []byte("#!~#") - - makeKey := func(name []byte, tags models.Tags, field []byte) []byte { - sz := 0 + - len(name) + - 1 + // name delimiter - tags.Size() + // total size of tags in bytes - len(tags) - 1 + // tag delimiters - len(keyFieldSeparatorBytes) + - len(field) - - key := make([]byte, sz) - models.AppendMakeKey(key, name, tags) - key = append(key, keyFieldSeparatorBytes...) - key = append(key, field...) - return key - } - - for rs.Next() { - countSeries++ - seriesKey := makeKey(rs.Name(), rs.Tags(), rs.Field()) - ci := rs.CursorIterator() - - for ci.Next() { - cur := ci.Cursor() - switch c := cur.(type) { - case tsdb.IntegerBatchCursor: - for { - keys, vals := c.Next() - if len(keys) == 0 { - break - } - countValues += len(keys) - for i, k := range keys { - values[i] = tsm1.NewIntegerValue(k, vals[i]) - } - sw.Write(seriesKey, values[:len(keys)]) - } - case tsdb.FloatBatchCursor: - for { - keys, vals := c.Next() - if len(keys) == 0 { - break - } - countValues += len(keys) - for i, k := range keys { - values[i] = tsm1.NewFloatValue(k, vals[i]) - } - sw.Write(seriesKey, values[:len(keys)]) - } - case tsdb.UnsignedBatchCursor: - for { - keys, vals := c.Next() - if len(keys) == 0 { - break - } - countValues += len(keys) - for i, k := range keys { - values[i] = tsm1.NewUnsignedValue(k, vals[i]) - } - sw.Write(seriesKey, values[:len(keys)]) - } - case tsdb.BooleanBatchCursor: - for { - keys, vals := c.Next() - if len(keys) == 0 { - break - } - countValues += len(keys) - for i, k := range keys { - values[i] = tsm1.NewBooleanValue(k, vals[i]) - } - sw.Write(seriesKey, values[:len(keys)]) - } - case tsdb.StringBatchCursor: - for { - keys, vals := c.Next() - if len(keys) == 0 { - break - } - countValues += len(keys) - for i, k := range keys { - values[i] = tsm1.NewStringValue(k, vals[i]) - } - sw.Write(seriesKey, values[:len(keys)]) - } - case nil: - // no data for series key + field combination in this shard - continue - default: - panic(fmt.Sprintf("unreachable: %T", c)) - } - cur.Close() - } - } - - fmt.Printf("processed %d series, %d values\n", countSeries, countValues) - - rs.Close() - sw.Close() - if sw.Err() != nil { - for _, f := range sw.Files() { - os.Remove(f) - } - return sw.Err() - } - - cmd.closeStore() // close TSDB store to release files - - newFiles, err := files.replace(sw.Files()) - if err != nil { - fmt.Printf("Compaction failed: unable to replace files\n%v", err) - return errors.New("unable to replace files") + return fmt.Errorf("compaction failed: %v", err) } fmt.Println("Compaction succeeded. New files:") - for _, f := range newFiles { + for _, f := range sc.newTSM { fmt.Printf(" %s\n", f) } @@ -259,81 +112,205 @@ func (cmd *Command) Run(args []string) (err error) { func (cmd *Command) parseFlags(args []string) error { fs := flag.NewFlagSet("compact-shard", flag.ContinueOnError) - fs.StringVar(&cmd.configPath, "config", "", "Config file") - fs.StringVar(&cmd.database, "database", "", "Database name") - fs.StringVar(&cmd.rp, "rp", "", "Retention policy name") - fs.Uint64Var(&cmd.shardID, "shard-id", 0, "Shard ID to compact") + fs.StringVar(&cmd.path, "path", "", "path of shard to be compacted") fs.BoolVar(&cmd.force, "force", false, "Force compaction without prompting") + fs.BoolVar(&cmd.verbose, "verbose", false, "Enable verbose logging") if err := fs.Parse(args); err != nil { return err } - if cmd.database == "" { - return errors.New("database is required") - } - - if cmd.rp == "" { - return errors.New("rp is required") - } - - if cmd.shardID == 0 { - return errors.New("shard-id is required") + if cmd.path == "" { + return errors.New("shard-path is required") } return nil } -func (cmd *Command) openStore() error { - store := tsdb.NewStore(cmd.server.TSDBConfig().Dir) - store.EngineOptions.MonitorDisabled = true - store.EngineOptions.CompactionDisabled = true - store.EngineOptions.Config = cmd.server.TSDBConfig() - store.EngineOptions.EngineVersion = cmd.server.TSDBConfig().Engine - store.EngineOptions.IndexVersion = cmd.server.TSDBConfig().Index - store.EngineOptions.DatabaseFilter = func(database string) bool { - return database == cmd.database - } - store.EngineOptions.RetentionPolicyFilter = func(_, rp string) bool { - return rp == cmd.rp - } - store.EngineOptions.ShardFilter = func(_, _ string, id uint64) bool { - return id == cmd.shardID +type shardCompactor struct { + logger *zap.Logger + path string + tsm []string + tombstone []string + readers []*tsm1.TSMReader + files map[string]*tsm1.TSMReader + newTSM []string +} + +func newShardCompactor(path string, logger *zap.Logger) (fs *shardCompactor, err error) { + fs = &shardCompactor{ + logger: logger, + path: path, + files: make(map[string]*tsm1.TSMReader), } - if err := store.Open(); err != nil { - return err + fs.tsm, err = filepath.Glob(filepath.Join(path, fmt.Sprintf("*.%s", tsm1.TSMFileExtension))) + if err != nil { + return nil, fmt.Errorf("newFileStore: error reading tsm files at path %q: %v", path, err) + } + if len(fs.tsm) == 0 { + return nil, fmt.Errorf("newFileStore: no tsm files at path %q", path) + } + sort.Strings(fs.tsm) + + fs.tombstone, err = filepath.Glob(filepath.Join(path, fmt.Sprintf("*.%s", "tombstone"))) + if err != nil { + return nil, fmt.Errorf("error reading tombstone files: %v", err) } - cmd.store = &storage.Store{TSDBStore: store} + fs.readers = make([]*tsm1.TSMReader, 0, len(fs.tsm)) + err = fs.openFiles() + if err != nil { + return nil, err + } + + return fs, nil +} + +func (sc *shardCompactor) openFiles() error { + // struct to hold the result of opening each reader in a goroutine + type res struct { + r *tsm1.TSMReader + err error + } + + lim := limiter.NewFixed(runtime.GOMAXPROCS(0)) + + readerC := make(chan *res) + for i, fn := range sc.tsm { + file, err := os.OpenFile(fn, os.O_RDONLY, 0666) + if err != nil { + return fmt.Errorf("newFileStore: failed to open file %q: %v", fn, err) + } + + go func(idx int, file *os.File) { + // Ensure a limited number of TSM files are loaded at once. + // Systems which have very large datasets (1TB+) can have thousands + // of TSM files which can cause extremely long load times. + lim.Take() + defer lim.Release() + + start := time.Now() + df, err := tsm1.NewTSMReader(file) + sc.logger.Info("Opened file", + zap.String("path", file.Name()), + zap.Int("id", idx), + zap.Duration("duration", time.Since(start))) + + // If we are unable to read a TSM file then log the error, rename + // the file, and continue loading the shard without it. + if err != nil { + sc.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err)) + if e := os.Rename(file.Name(), file.Name()+"."+tsm1.BadTSMFileExtension); e != nil { + sc.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e)) + readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %v", file.Name(), e)} + return + } + } + + readerC <- &res{r: df} + }(i, file) + } + + for range sc.tsm { + res := <-readerC + if res.err != nil { + return res.err + } else if res.r == nil { + continue + } + sc.readers = append(sc.readers, res.r) + sc.files[res.r.Path()] = res.r + } + close(readerC) + sort.Sort(tsmReaders(sc.readers)) return nil } -func (cmd *Command) closeStore() { - if cmd.store != nil { - cmd.store.TSDBStore.Close() - cmd.store = nil +func (sc *shardCompactor) CompactShard() (err error) { + c := tsm1.NewCompactor() + c.Dir = sc.path + c.Size = tsm1.DefaultSegmentSize + c.FileStore = sc + c.Open() + + tsmFiles, err := c.CompactFull(sc.tsm) + if err == nil { + sc.newTSM, err = sc.replace(tsmFiles) } + return err } -func (cmd *Command) getShard() *tsdb.Shard { - return cmd.store.TSDBStore.Shard(cmd.shardID) -} - -func (cmd *Command) read() (*storage.ResultSet, error) { - sh := cmd.getShard() - if sh == nil { - return nil, nil +// replace replaces the existing shard files with temporary tsmFiles +func (sc *shardCompactor) replace(tsmFiles []string) ([]string, error) { + // rename .tsm.tmp → .tsm + var newNames []string + for _, file := range tsmFiles { + var newName = file[:len(file)-4] // remove extension + if err := os.Rename(file, newName); err != nil { + return nil, err + } + newNames = append(newNames, newName) } - req := storage.ReadRequest{ - Database: cmd.database, - RP: cmd.rp, - Shards: []*tsdb.Shard{sh}, - Start: models.MinNanoTime, - End: models.MaxNanoTime, + var errs errlist.ErrorList + + // close all readers + for _, r := range sc.readers { + r.Close() } - return cmd.store.Read(context.Background(), &req) + sc.readers = nil + sc.files = nil + + // remove existing .tsm and .tombstone + for _, file := range sc.tsm { + errs.Add(os.Remove(file)) + } + + for _, file := range sc.tombstone { + errs.Add(os.Remove(file)) + } + + return newNames, errs.Err() } + +func (sc *shardCompactor) NextGeneration() int { + panic("not implemented") +} + +func (sc *shardCompactor) TSMReader(path string) *tsm1.TSMReader { + r := sc.files[path] + if r != nil { + r.Ref() + } + return r +} + +func (sc *shardCompactor) String() string { + var sb bytes.Buffer + sb.WriteString("TSM:\n") + for _, f := range sc.tsm { + sb.WriteString(" ") + sb.WriteString(f) + sb.WriteByte('\n') + } + + if len(sc.tombstone) > 0 { + sb.WriteString("\nTombstone:\n") + for _, f := range sc.tombstone { + sb.WriteString(" ") + sb.WriteString(f) + sb.WriteByte('\n') + } + } + + return sb.String() +} + +type tsmReaders []*tsm1.TSMReader + +func (a tsmReaders) Len() int { return len(a) } +func (a tsmReaders) Less(i, j int) bool { return a[i].Path() < a[j].Path() } +func (a tsmReaders) Swap(i, j int) { a[i], a[j] = a[j], a[i] } diff --git a/cmd/influx_tools/compact/shardfiles.go b/cmd/influx_tools/compact/shardfiles.go deleted file mode 100644 index a41bdf84e2..0000000000 --- a/cmd/influx_tools/compact/shardfiles.go +++ /dev/null @@ -1,98 +0,0 @@ -package compact - -import ( - "bytes" - "fmt" - "os" - "path/filepath" - - "github.com/influxdata/influxdb/cmd/influx-tools/internal/errlist" - "github.com/influxdata/influxdb/tsdb/engine/tsm1" -) - -type shardFiles struct { - tsm []string - tombstone []string - wal []string -} - -func newShardFiles(dataPath, walPath string) (s *shardFiles, err error) { - s = &shardFiles{} - - s.tsm, err = filepath.Glob(filepath.Join(dataPath, fmt.Sprintf("*.%s", tsm1.TSMFileExtension))) - if err != nil { - return nil, fmt.Errorf("error reading tsm files: %v", err) - } - - s.tombstone, err = filepath.Glob(filepath.Join(dataPath, fmt.Sprintf("*.%s", "tombstone"))) - if err != nil { - return nil, fmt.Errorf("error reading tombstone files: %v", err) - } - - s.wal, err = filepath.Glob(filepath.Join(walPath, fmt.Sprintf("*.%s", tsm1.WALFileExtension))) - if err != nil { - return nil, fmt.Errorf("error reading tombstone files: %v", err) - } - - return s, nil -} - -// replace replaces the existing shard files with temporary tsmFiles -func (s *shardFiles) replace(tsmFiles []string) ([]string, error) { - // rename .tsm.tmp → .tsm - var newNames []string - for _, file := range tsmFiles { - var newName = file[:len(file)-4] // remove extension - if err := os.Rename(file, newName); err != nil { - return nil, err - } - newNames = append(newNames, newName) - } - - var errs errlist.ErrorList - - // remove existing .tsm, .tombstone and .wal files - for _, file := range s.tsm { - errs.Add(os.Remove(file)) - } - - for _, file := range s.tombstone { - errs.Add(os.Remove(file)) - } - - for _, file := range s.wal { - errs.Add(os.Remove(file)) - } - - return newNames, errs.Err() -} - -func (s *shardFiles) String() string { - var sb bytes.Buffer - sb.WriteString("TSM:\n") - for _, f := range s.tsm { - sb.WriteString(" ") - sb.WriteString(f) - sb.WriteByte('\n') - } - - if len(s.tombstone) > 0 { - sb.WriteString("\nTombstone:\n") - for _, f := range s.tombstone { - sb.WriteString(" ") - sb.WriteString(f) - sb.WriteByte('\n') - } - } - - if len(s.wal) > 0 { - sb.WriteString("\nWAL:\n") - for _, f := range s.wal { - sb.WriteString(" ") - sb.WriteString(f) - sb.WriteByte('\n') - } - } - - return sb.String() -}