Merge pull request #7189 from influxdata/ga-inspect
Add time range options to influx_inspect exportpull/7248/head
commit
df7e609c3e
|
@ -5,10 +5,13 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
|
||||
)
|
||||
|
@ -18,13 +21,36 @@ type cmdExport struct {
|
|||
out string
|
||||
db string
|
||||
retentionPolicy string
|
||||
startTime int64
|
||||
endTime int64
|
||||
compress bool
|
||||
|
||||
ext string
|
||||
files map[string][]string
|
||||
}
|
||||
|
||||
func newCmdExport(path, out, db, retentionPolicy string, compress bool) *cmdExport {
|
||||
func newCmdExport(path, out, db, retentionPolicy, startTime, endTime string, compress bool) (*cmdExport, error) {
|
||||
var start, end int64
|
||||
if startTime != "" {
|
||||
s, err := time.Parse(time.RFC3339, startTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
start = s.UnixNano()
|
||||
} else {
|
||||
start = math.MinInt64
|
||||
}
|
||||
if endTime != "" {
|
||||
e, err := time.Parse(time.RFC3339, endTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
end = e.UnixNano()
|
||||
} else {
|
||||
// set end time to max if it is not set.
|
||||
end = math.MaxInt64
|
||||
}
|
||||
|
||||
return &cmdExport{
|
||||
path: filepath.Join(path, "data"),
|
||||
out: out,
|
||||
|
@ -32,8 +58,10 @@ func newCmdExport(path, out, db, retentionPolicy string, compress bool) *cmdExpo
|
|||
compress: compress,
|
||||
ext: fmt.Sprintf(".%s", tsm1.TSMFileExtension),
|
||||
retentionPolicy: retentionPolicy,
|
||||
startTime: start,
|
||||
endTime: end,
|
||||
files: make(map[string][]string),
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *cmdExport) validate() error {
|
||||
|
@ -41,6 +69,9 @@ func (c *cmdExport) validate() error {
|
|||
if c.retentionPolicy != "" && c.db == "" {
|
||||
return fmt.Errorf("must specify a db")
|
||||
}
|
||||
if c.startTime != 0 && c.endTime != 0 && c.endTime < c.startTime {
|
||||
return fmt.Errorf("end time before start time")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -100,12 +131,15 @@ func (c *cmdExport) writeFiles() error {
|
|||
defer w.Close()
|
||||
}
|
||||
|
||||
s, e := time.Unix(0, c.startTime).Format(time.RFC3339), time.Unix(0, c.endTime).Format(time.RFC3339)
|
||||
fmt.Fprintf(w, "# INFLUXDB EXPORT: %s - %s\n", s, e)
|
||||
|
||||
// Write out all the DDL
|
||||
fmt.Fprintln(w, "# DDL")
|
||||
for key, _ := range c.files {
|
||||
keys := strings.Split(key, string(byte(os.PathSeparator)))
|
||||
fmt.Fprintf(w, "CREATE DATABASE %s\n", keys[0])
|
||||
fmt.Fprintf(w, "CREATE RETENTION POLICY %s ON %s DURATION inf REPLICATION 1\n", keys[1], keys[0])
|
||||
db, rp := influxql.QuoteIdent(keys[0]), influxql.QuoteIdent(keys[1])
|
||||
fmt.Fprintf(w, "CREATE DATABASE %s WITH NAME %s\n", db, rp)
|
||||
}
|
||||
|
||||
fmt.Fprintln(w, "# DML")
|
||||
|
@ -129,6 +163,10 @@ func (c *cmdExport) writeFiles() error {
|
|||
}
|
||||
defer reader.Close()
|
||||
|
||||
if sgStart, sgEnd := reader.TimeRange(); sgStart > c.endTime || sgEnd < c.startTime {
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := 0; i < reader.KeyCount(); i++ {
|
||||
var pairs string
|
||||
key, typ := reader.KeyAt(i)
|
||||
|
@ -136,6 +174,10 @@ func (c *cmdExport) writeFiles() error {
|
|||
measurement, field := tsm1.SeriesAndFieldFromCompositeKey(key)
|
||||
|
||||
for _, value := range values {
|
||||
if (value.UnixNano() < c.startTime) || (value.UnixNano() > c.endTime) {
|
||||
continue
|
||||
}
|
||||
|
||||
switch typ {
|
||||
case tsm1.BlockFloat64:
|
||||
pairs = field + "=" + fmt.Sprintf("%v", value.Value())
|
||||
|
@ -149,7 +191,7 @@ func (c *cmdExport) writeFiles() error {
|
|||
pairs = field + "=" + fmt.Sprintf("%v", value.Value())
|
||||
}
|
||||
|
||||
fmt.Fprintln(w, measurement, pairs, value.UnixNano())
|
||||
fmt.Fprintln(w, string(measurement), pairs, value.UnixNano())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -104,13 +104,15 @@ func main() {
|
|||
}
|
||||
cmdVerify(path)
|
||||
case "export":
|
||||
var path, out, db, rp string
|
||||
var path, out, db, rp, start, end string
|
||||
var compress bool
|
||||
fs := flag.NewFlagSet("export", flag.ExitOnError)
|
||||
fs.StringVar(&path, "dir", os.Getenv("HOME")+"/.influxdb", "Root storage path. [$HOME/.influxdb]")
|
||||
fs.StringVar(&out, "out", os.Getenv("HOME")+"/.influxdb/export", "Destination file to export to")
|
||||
fs.StringVar(&db, "db", "", "Optional: the database to export")
|
||||
fs.StringVar(&rp, "rp", "", "Optional: the retention policy to export (requires db parameter to be specified)")
|
||||
fs.StringVar(&start, "start-time", "", "Optional: the start time to export")
|
||||
fs.StringVar(&end, "end-time", "", "Optional: the end time to export")
|
||||
fs.BoolVar(&compress, "compress", false, "Compress the output")
|
||||
|
||||
fs.Usage = func() {
|
||||
|
@ -124,7 +126,11 @@ func main() {
|
|||
fmt.Printf("%v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
c := newCmdExport(path, out, db, rp, compress)
|
||||
c, err := newCmdExport(path, out, db, rp, start, end, compress)
|
||||
if err != nil {
|
||||
fmt.Printf("%v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if err := c.run(); err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
|
|
Loading…
Reference in New Issue