influxdb/cmd/influx_tools/export/command.go

217 lines
5.0 KiB
Go

package export
import (
"compress/gzip"
"errors"
"flag"
"fmt"
"io"
"math"
"os"
"strconv"
"strings"
"time"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/format"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/format/binary"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/format/line"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/format/text"
"github.com/influxdata/influxdb/cmd/influx_tools/server"
errors2 "github.com/influxdata/influxdb/pkg/errors"
"go.uber.org/zap"
)
var (
_ line.Writer
_ binary.Writer
)
// Command represents the program execution for "store query".
type Command struct {
// Standard input/output, overridden for testing.
Stderr io.Writer
Stdout io.Writer
Logger *zap.Logger
server server.Interface
conflicts io.WriteCloser
configPath string
database string
rp string
shardDuration time.Duration
format string
r rangeValue
conflictPath string
ignore bool
print bool
}
// NewCommand returns a new instance of the export Command.
func NewCommand(server server.Interface) *Command {
return &Command{
Stderr: os.Stderr,
Stdout: os.Stdout,
server: server,
}
}
// Run executes the export command using the specified args.
func (cmd *Command) Run(args []string) (err error) {
err = cmd.parseFlags(args)
if err != nil {
return err
}
err = cmd.server.Open(cmd.configPath)
if err != nil {
return err
}
defer cmd.server.Close()
e, err := cmd.openExporter()
if err != nil {
return err
}
defer e.Close()
e.PrintPlan(cmd.Stderr)
if cmd.print {
return nil
}
if !cmd.ignore {
if f, err := os.Create(cmd.conflictPath); err != nil {
return err
} else {
cmd.conflicts = gzip.NewWriter(f)
defer func() {
cmd.conflicts.Close()
f.Close()
}()
}
}
var wr format.Writer
switch cmd.format {
case "line":
wr = line.NewWriter(cmd.Stdout)
case "binary":
wr = binary.NewWriter(cmd.Stdout, cmd.database, cmd.rp, cmd.shardDuration)
case "series":
wr = text.NewWriter(cmd.Stdout, text.Series)
case "values":
wr = text.NewWriter(cmd.Stdout, text.Values)
case "discard":
wr = format.Discard
}
defer errors2.Capture(&err, wr.Close)()
if cmd.conflicts != nil {
wr = format.NewConflictWriter(wr, line.NewWriter(cmd.conflicts))
} else {
wr = format.NewConflictWriter(wr, format.DevNull)
}
return e.WriteTo(wr)
}
func (cmd *Command) openExporter() (*exporter, error) {
cfg := &exporterConfig{Database: cmd.database, RP: cmd.rp, ShardDuration: cmd.shardDuration, Min: cmd.r.Min(), Max: cmd.r.Max()}
e, err := newExporter(cmd.server, cfg)
if err != nil {
return nil, err
}
return e, e.Open()
}
func (cmd *Command) parseFlags(args []string) error {
fs := flag.NewFlagSet("export", flag.ContinueOnError)
fs.StringVar(&cmd.configPath, "config", "", "Config file")
fs.StringVar(&cmd.database, "database", "", "Database name")
fs.StringVar(&cmd.rp, "rp", "", "Retention policy name")
fs.StringVar(&cmd.format, "format", "line", "Output format (line, binary)")
fs.StringVar(&cmd.conflictPath, "conflict-path", "", "File name for writing field conflicts using line protocol and gzipped")
fs.BoolVar(&cmd.ignore, "no-conflict-path", false, "Disable writing field conflicts to a file")
fs.Var(&cmd.r, "range", "Range of target shards to export (default: all)")
fs.BoolVar(&cmd.print, "print-only", false, "Print plan to stderr and exit")
fs.DurationVar(&cmd.shardDuration, "duration", time.Hour*24*7, "Target shard duration")
if err := fs.Parse(args); err != nil {
return err
}
if cmd.database == "" {
return errors.New("database is required")
}
switch cmd.format {
case "line", "binary", "series", "values", "discard":
default:
return fmt.Errorf("invalid format '%s'", cmd.format)
}
if cmd.conflictPath == "" && !cmd.ignore {
return errors.New("missing conflict-path")
}
return nil
}
type rangeValue struct {
min, max uint64
set bool
}
func (rv *rangeValue) Min() uint64 { return rv.min }
func (rv *rangeValue) Max() uint64 {
if !rv.set {
return math.MaxUint64
}
return rv.max
}
func (rv *rangeValue) String() string {
if rv.Min() == rv.Max() {
return fmt.Sprint(rv.min)
}
return fmt.Sprintf("[%d,%d]", rv.Min(), rv.Max())
}
func (rv *rangeValue) Set(v string) (err error) {
p := strings.Split(v, "-")
switch {
case len(p) == 1:
rv.min, err = strconv.ParseUint(p[0], 10, 64)
if err != nil {
return fmt.Errorf("range error: invalid number %s", v)
}
rv.max = rv.min
case len(p) == 2:
rv.min, err = strconv.ParseUint(p[0], 10, 64)
if err != nil {
return fmt.Errorf("range error: min value %q is not a positive number", p[0])
}
rv.max = math.MaxUint64
if len(p[1]) > 0 {
rv.max, err = strconv.ParseUint(p[1], 10, 64)
if err != nil {
return fmt.Errorf("range error: max value %q is not empty or a positive number", p[1])
}
}
default:
return fmt.Errorf("range error: %q is not a valid range", v)
}
if rv.min > rv.max {
return errors.New("range error: min > max")
}
rv.set = true
return nil
}