pr(influx_tools): PR feedback

pull/10029/merge
Stuart Carnie 2018-06-28 08:08:33 -07:00
parent 8c2ff02ede
commit e832514355
2 changed files with 21 additions and 27 deletions

View File

@ -17,8 +17,6 @@ import (
"github.com/influxdata/influxdb/cmd/influx_tools/internal/errlist" "github.com/influxdata/influxdb/cmd/influx_tools/internal/errlist"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/format/binary" "github.com/influxdata/influxdb/cmd/influx_tools/internal/format/binary"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/format/line" "github.com/influxdata/influxdb/cmd/influx_tools/internal/format/line"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/storage"
"github.com/influxdata/influxdb/cmd/influx_tools/server"
"github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/tsdb/engine/tsm1" "github.com/influxdata/influxdb/tsdb/engine/tsm1"
@ -37,20 +35,16 @@ type Command struct {
Stdout io.Writer Stdout io.Writer
Logger *zap.Logger Logger *zap.Logger
server server.Interface
store *storage.Store
path string path string
force bool force bool
verbose bool verbose bool
} }
// NewCommand returns a new instance of the export Command. // NewCommand returns a new instance of the export Command.
func NewCommand(server server.Interface) *Command { func NewCommand() *Command {
return &Command{ return &Command{
Stderr: os.Stderr, Stderr: os.Stderr,
Stdout: os.Stdout, Stdout: os.Stdout,
server: server,
} }
} }
@ -70,20 +64,20 @@ func (cmd *Command) Run(args []string) (err error) {
} }
} }
fmt.Printf("opening shard at path %q\n\n", cmd.path) fmt.Fprintf(cmd.Stdout, "opening shard at path %q\n\n", cmd.path)
sc, err := newShardCompactor(cmd.path, log) sc, err := newShardCompactor(cmd.path, log)
if err != nil { if err != nil {
return err return err
} }
fmt.Println() fmt.Fprintln(cmd.Stdout)
fmt.Println("The following files will be compacted:") fmt.Fprintln(cmd.Stdout, "The following files will be compacted:")
fmt.Println() fmt.Fprintln(cmd.Stdout)
fmt.Println(sc.String()) fmt.Fprintln(cmd.Stdout, sc.String())
if !cmd.force { if !cmd.force {
fmt.Print("Proceed? [N] ") fmt.Fprint(cmd.Stdout, "Proceed? [N] ")
scan := bufio.NewScanner(os.Stdin) scan := bufio.NewScanner(os.Stdin)
scan.Scan() scan.Scan()
if scan.Err() != nil { if scan.Err() != nil {
@ -95,16 +89,16 @@ func (cmd *Command) Run(args []string) (err error) {
} }
} }
fmt.Println("Compacting shard.") fmt.Fprintln(cmd.Stdout, "Compacting shard.")
err = sc.CompactShard() err = sc.CompactShard()
if err != nil { if err != nil {
return fmt.Errorf("compaction failed: %v", err) return fmt.Errorf("compaction failed: %v", err)
} }
fmt.Println("Compaction succeeded. New files:") fmt.Fprintln(cmd.Stdout, "Compaction succeeded. New files:")
for _, f := range sc.newTSM { for _, f := range sc.newTSM {
fmt.Printf(" %s\n", f) fmt.Fprintf(cmd.Stdout, " %s\n", f)
} }
return nil return nil
@ -137,37 +131,37 @@ type shardCompactor struct {
newTSM []string newTSM []string
} }
func newShardCompactor(path string, logger *zap.Logger) (fs *shardCompactor, err error) { func newShardCompactor(path string, logger *zap.Logger) (sc *shardCompactor, err error) {
fs = &shardCompactor{ sc = &shardCompactor{
logger: logger, logger: logger,
path: path, path: path,
files: make(map[string]*tsm1.TSMReader), files: make(map[string]*tsm1.TSMReader),
} }
fs.tsm, err = filepath.Glob(filepath.Join(path, fmt.Sprintf("*.%s", tsm1.TSMFileExtension))) sc.tsm, err = filepath.Glob(filepath.Join(path, fmt.Sprintf("*.%s", tsm1.TSMFileExtension)))
if err != nil { if err != nil {
return nil, fmt.Errorf("newFileStore: error reading tsm files at path %q: %v", path, err) return nil, fmt.Errorf("newFileStore: error reading tsm files at path %q: %v", path, err)
} }
if len(fs.tsm) == 0 { if len(sc.tsm) == 0 {
return nil, fmt.Errorf("newFileStore: no tsm files at path %q", path) return nil, fmt.Errorf("newFileStore: no tsm files at path %q", path)
} }
sort.Strings(fs.tsm) sort.Strings(sc.tsm)
fs.tombstone, err = filepath.Glob(filepath.Join(path, fmt.Sprintf("*.%s", "tombstone"))) sc.tombstone, err = filepath.Glob(filepath.Join(path, fmt.Sprintf("*.%s", "tombstone")))
if err != nil { if err != nil {
return nil, fmt.Errorf("error reading tombstone files: %v", err) return nil, fmt.Errorf("error reading tombstone files: %v", err)
} }
fs.readers = make([]*tsm1.TSMReader, 0, len(fs.tsm)) if err := sc.openFiles(); err != nil {
err = fs.openFiles()
if err != nil {
return nil, err return nil, err
} }
return fs, nil return sc, nil
} }
func (sc *shardCompactor) openFiles() error { func (sc *shardCompactor) openFiles() error {
sc.readers = make([]*tsm1.TSMReader, 0, len(sc.tsm))
// struct to hold the result of opening each reader in a goroutine // struct to hold the result of opening each reader in a goroutine
type res struct { type res struct {
r *tsm1.TSMReader r *tsm1.TSMReader

View File

@ -55,7 +55,7 @@ func (m *Main) Run(args ...string) error {
return fmt.Errorf("help failed: %s", err) return fmt.Errorf("help failed: %s", err)
} }
case "compact-shard": case "compact-shard":
c := compact.NewCommand(&ossServer{logger: zap.NewNop(), noClient: true}) c := compact.NewCommand()
if err := c.Run(args); err != nil { if err := c.Run(args); err != nil {
return fmt.Errorf("compact-shard failed: %s", err) return fmt.Errorf("compact-shard failed: %s", err)
} }