Clean up export code, add tests and benchmarks
The export code was moved around a bit, particularly to ease testing export of a single TSM or WAL file. The functionality should not have changed.pull/7741/head
parent
a645dff54e
commit
da45aab52c
|
@ -8,7 +8,6 @@ import (
|
|||
"log"
|
||||
"math"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
|
@ -62,8 +61,8 @@ func (cmd *Command) Run(args ...string) error {
|
|||
fs.StringVar(&cmd.out, "out", os.Getenv("HOME")+"/.influxdb/export", "Destination file to export to")
|
||||
fs.StringVar(&cmd.database, "database", "", "Optional: the database to export")
|
||||
fs.StringVar(&cmd.retentionPolicy, "retention", "", "Optional: the retention policy to export (requires -database)")
|
||||
fs.StringVar(&start, "start", "", "Optional: the start time to export")
|
||||
fs.StringVar(&end, "end", "", "Optional: the end time to export")
|
||||
fs.StringVar(&start, "start", "", "Optional: the start time to export (RFC3339 format)")
|
||||
fs.StringVar(&end, "end", "", "Optional: the end time to export (RFC3339 format)")
|
||||
fs.BoolVar(&cmd.compress, "compress", false, "Compress the output")
|
||||
|
||||
fs.SetOutput(cmd.Stdout)
|
||||
|
@ -106,7 +105,6 @@ func (cmd *Command) Run(args ...string) error {
|
|||
}
|
||||
|
||||
func (cmd *Command) validate() error {
|
||||
// validate args
|
||||
if cmd.retentionPolicy != "" && cmd.database == "" {
|
||||
return fmt.Errorf("must specify a db")
|
||||
}
|
||||
|
@ -123,84 +121,71 @@ func (cmd *Command) export() error {
|
|||
if err := cmd.walkWALFiles(); err != nil {
|
||||
return err
|
||||
}
|
||||
return cmd.writeFiles()
|
||||
return cmd.write()
|
||||
}
|
||||
|
||||
func (cmd *Command) walkTSMFiles() error {
|
||||
err := filepath.Walk(cmd.dataDir, func(dir string, f os.FileInfo, err error) error {
|
||||
return filepath.Walk(cmd.dataDir, func(path string, f os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// check to see if this is a tsm file
|
||||
ext := fmt.Sprintf(".%s", tsm1.TSMFileExtension)
|
||||
if filepath.Ext(dir) != ext {
|
||||
if filepath.Ext(path) != "."+tsm1.TSMFileExtension {
|
||||
return nil
|
||||
}
|
||||
|
||||
relPath, _ := filepath.Rel(cmd.dataDir, dir)
|
||||
relPath, err := filepath.Rel(cmd.dataDir, path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dirs := strings.Split(relPath, string(byte(os.PathSeparator)))
|
||||
if len(dirs) < 2 {
|
||||
return fmt.Errorf("invalid directory structure for %s", dir)
|
||||
return fmt.Errorf("invalid directory structure for %s", path)
|
||||
}
|
||||
if dirs[0] == cmd.database || cmd.database == "" {
|
||||
if dirs[1] == cmd.retentionPolicy || cmd.retentionPolicy == "" {
|
||||
key := filepath.Join(dirs[0], dirs[1])
|
||||
files := cmd.tsmFiles[key]
|
||||
if files == nil {
|
||||
files = []string{}
|
||||
}
|
||||
cmd.manifest[key] = struct{}{}
|
||||
cmd.tsmFiles[key] = append(files, dir)
|
||||
cmd.tsmFiles[key] = append(cmd.tsmFiles[key], path)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cmd *Command) walkWALFiles() error {
|
||||
err := filepath.Walk(cmd.walDir, func(dir string, f os.FileInfo, err error) error {
|
||||
return filepath.Walk(cmd.walDir, func(path string, f os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// check to see if this is a wal file
|
||||
prefix := tsm1.WALFilePrefix
|
||||
ext := fmt.Sprintf(".%s", tsm1.WALFileExtension)
|
||||
_, fileName := path.Split(dir)
|
||||
if filepath.Ext(dir) != ext || !strings.HasPrefix(fileName, prefix) {
|
||||
fileName := filepath.Base(path)
|
||||
if filepath.Ext(path) != "."+tsm1.WALFileExtension || !strings.HasPrefix(fileName, tsm1.WALFilePrefix) {
|
||||
return nil
|
||||
}
|
||||
|
||||
relPath, _ := filepath.Rel(cmd.walDir, dir)
|
||||
relPath, err := filepath.Rel(cmd.walDir, path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dirs := strings.Split(relPath, string(byte(os.PathSeparator)))
|
||||
if len(dirs) < 2 {
|
||||
return fmt.Errorf("invalid directory structure for %s", dir)
|
||||
return fmt.Errorf("invalid directory structure for %s", path)
|
||||
}
|
||||
if dirs[0] == cmd.database || cmd.database == "" {
|
||||
if dirs[1] == cmd.retentionPolicy || cmd.retentionPolicy == "" {
|
||||
key := filepath.Join(dirs[0], dirs[1])
|
||||
files := cmd.walFiles[key]
|
||||
if files == nil {
|
||||
files = []string{}
|
||||
}
|
||||
cmd.manifest[key] = struct{}{}
|
||||
cmd.walFiles[key] = append(files, dir)
|
||||
cmd.walFiles[key] = append(cmd.walFiles[key], path)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cmd *Command) writeFiles() error {
|
||||
func (cmd *Command) write() error {
|
||||
// open our output file and create an output buffer
|
||||
var w io.WriteCloser
|
||||
w, err := os.Create(cmd.out)
|
||||
|
@ -219,92 +204,42 @@ func (cmd *Command) writeFiles() error {
|
|||
// Write out all the DDL
|
||||
fmt.Fprintln(w, "# DDL")
|
||||
for key := range cmd.manifest {
|
||||
keys := strings.Split(key, string(byte(os.PathSeparator)))
|
||||
keys := strings.Split(key, string(os.PathSeparator))
|
||||
db, rp := influxql.QuoteIdent(keys[0]), influxql.QuoteIdent(keys[1])
|
||||
fmt.Fprintf(w, "CREATE DATABASE %s WITH NAME %s\n", db, rp)
|
||||
}
|
||||
|
||||
fmt.Fprintln(w, "# DML")
|
||||
for key := range cmd.manifest {
|
||||
keys := strings.Split(key, string(byte(os.PathSeparator)))
|
||||
keys := strings.Split(key, string(os.PathSeparator))
|
||||
fmt.Fprintf(w, "# CONTEXT-DATABASE:%s\n", keys[0])
|
||||
fmt.Fprintf(w, "# CONTEXT-RETENTION-POLICY:%s\n", keys[1])
|
||||
if files, ok := cmd.tsmFiles[key]; ok {
|
||||
fmt.Printf("writing out tsm file data for %s...", key)
|
||||
fmt.Fprintf(cmd.Stdout, "writing out tsm file data for %s...", key)
|
||||
if err := cmd.writeTsmFiles(w, files); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Println("complete.")
|
||||
fmt.Fprintln(cmd.Stdout, "complete.")
|
||||
}
|
||||
if _, ok := cmd.walFiles[key]; ok {
|
||||
fmt.Printf("writing out wal file data for %s...", key)
|
||||
fmt.Fprintf(cmd.Stdout, "writing out wal file data for %s...", key)
|
||||
if err := cmd.writeWALFiles(w, cmd.walFiles[key], key); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Println("complete.")
|
||||
fmt.Fprintln(cmd.Stdout, "complete.")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cmd *Command) writeTsmFiles(w io.WriteCloser, files []string) error {
|
||||
func (cmd *Command) writeTsmFiles(w io.Writer, files []string) error {
|
||||
fmt.Fprintln(w, "# writing tsm data")
|
||||
|
||||
// we need to make sure we write the same order that the files were written
|
||||
sort.Strings(files)
|
||||
|
||||
// use a function here to close the files in the defers and not let them accumulate in the loop
|
||||
write := func(f string) error {
|
||||
file, err := os.OpenFile(f, os.O_RDONLY, 0600)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v", err)
|
||||
}
|
||||
defer file.Close()
|
||||
reader, err := tsm1.NewTSMReader(file)
|
||||
if err != nil {
|
||||
log.Printf("unable to read %s, skipping\n", f)
|
||||
return nil
|
||||
}
|
||||
defer reader.Close()
|
||||
|
||||
if sgStart, sgEnd := reader.TimeRange(); sgStart > cmd.endTime || sgEnd < cmd.startTime {
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := 0; i < reader.KeyCount(); i++ {
|
||||
var pairs string
|
||||
key, typ := reader.KeyAt(i)
|
||||
values, _ := reader.ReadAll(string(key))
|
||||
measurement, field := tsm1.SeriesAndFieldFromCompositeKey(key)
|
||||
// measurements are stored escaped, field names are not
|
||||
field = escape.String(field)
|
||||
|
||||
for _, value := range values {
|
||||
if (value.UnixNano() < cmd.startTime) || (value.UnixNano() > cmd.endTime) {
|
||||
continue
|
||||
}
|
||||
|
||||
switch typ {
|
||||
case tsm1.BlockFloat64:
|
||||
pairs = field + "=" + fmt.Sprintf("%v", value.Value())
|
||||
case tsm1.BlockInteger:
|
||||
pairs = field + "=" + fmt.Sprintf("%vi", value.Value())
|
||||
case tsm1.BlockBoolean:
|
||||
pairs = field + "=" + fmt.Sprintf("%v", value.Value())
|
||||
case tsm1.BlockString:
|
||||
pairs = field + "=" + fmt.Sprintf("%q", models.EscapeStringField(fmt.Sprintf("%s", value.Value())))
|
||||
default:
|
||||
pairs = field + "=" + fmt.Sprintf("%v", value.Value())
|
||||
}
|
||||
|
||||
fmt.Fprintln(w, string(measurement), pairs, value.UnixNano())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, f := range files {
|
||||
if err := write(f); err != nil {
|
||||
if err := cmd.exportTSMFile(f, w); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -312,85 +247,135 @@ func (cmd *Command) writeTsmFiles(w io.WriteCloser, files []string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (cmd *Command) writeWALFiles(w io.WriteCloser, files []string, key string) error {
|
||||
func (cmd *Command) exportTSMFile(tsmFilePath string, w io.Writer) error {
|
||||
f, err := os.Open(tsmFilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
r, err := tsm1.NewTSMReader(f)
|
||||
if err != nil {
|
||||
log.Printf("unable to read %s, skipping: %s\n", f, err.Error())
|
||||
return nil
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
if sgStart, sgEnd := r.TimeRange(); sgStart > cmd.endTime || sgEnd < cmd.startTime {
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := 0; i < r.KeyCount(); i++ {
|
||||
var pairs string
|
||||
key, typ := r.KeyAt(i)
|
||||
values, _ := r.ReadAll(string(key))
|
||||
measurement, field := tsm1.SeriesAndFieldFromCompositeKey(key)
|
||||
// measurements are stored escaped, field names are not
|
||||
field = escape.String(field)
|
||||
|
||||
for _, value := range values {
|
||||
if (value.UnixNano() < cmd.startTime) || (value.UnixNano() > cmd.endTime) {
|
||||
continue
|
||||
}
|
||||
|
||||
switch typ {
|
||||
case tsm1.BlockFloat64:
|
||||
pairs = field + "=" + fmt.Sprintf("%v", value.Value())
|
||||
case tsm1.BlockInteger:
|
||||
pairs = field + "=" + fmt.Sprintf("%vi", value.Value())
|
||||
case tsm1.BlockBoolean:
|
||||
pairs = field + "=" + fmt.Sprintf("%v", value.Value())
|
||||
case tsm1.BlockString:
|
||||
pairs = field + "=" + fmt.Sprintf("%q", models.EscapeStringField(fmt.Sprintf("%s", value.Value())))
|
||||
default:
|
||||
pairs = field + "=" + fmt.Sprintf("%v", value.Value())
|
||||
}
|
||||
|
||||
fmt.Fprintln(w, string(measurement), pairs, value.UnixNano())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cmd *Command) writeWALFiles(w io.Writer, files []string, key string) error {
|
||||
fmt.Fprintln(w, "# writing wal data")
|
||||
|
||||
// we need to make sure we write the same order that the wal received the data
|
||||
sort.Strings(files)
|
||||
|
||||
var once sync.Once
|
||||
warn := func() {
|
||||
msg := fmt.Sprintf(`WARNING: detected deletes in wal file.
|
||||
Some series for %q may be brought back by replaying this data.
|
||||
To resolve, you can either let the shard snapshot prior to exporting the data
|
||||
or manually editing the exported file.
|
||||
`, key)
|
||||
fmt.Fprintln(cmd.Stderr, msg)
|
||||
}
|
||||
|
||||
// use a function here to close the files in the defers and not let them accumulate in the loop
|
||||
write := func(f string) error {
|
||||
file, err := os.OpenFile(f, os.O_RDONLY, 0600)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
reader := tsm1.NewWALSegmentReader(file)
|
||||
defer reader.Close()
|
||||
for reader.Next() {
|
||||
entry, err := reader.Read()
|
||||
if err != nil {
|
||||
n := reader.Count()
|
||||
fmt.Fprintf(os.Stderr, "file %s corrupt at position %d", file.Name(), n)
|
||||
break
|
||||
}
|
||||
|
||||
switch t := entry.(type) {
|
||||
case *tsm1.DeleteWALEntry:
|
||||
once.Do(warn)
|
||||
continue
|
||||
case *tsm1.DeleteRangeWALEntry:
|
||||
once.Do(warn)
|
||||
continue
|
||||
case *tsm1.WriteWALEntry:
|
||||
var pairs string
|
||||
|
||||
for key, values := range t.Values {
|
||||
measurement, field := tsm1.SeriesAndFieldFromCompositeKey([]byte(key))
|
||||
// measurements are stored escaped, field names are not
|
||||
field = escape.String(field)
|
||||
|
||||
for _, value := range values {
|
||||
if (value.UnixNano() < cmd.startTime) || (value.UnixNano() > cmd.endTime) {
|
||||
continue
|
||||
}
|
||||
|
||||
switch value.Value().(type) {
|
||||
case float64:
|
||||
pairs = field + "=" + fmt.Sprintf("%v", value.Value())
|
||||
case int64:
|
||||
pairs = field + "=" + fmt.Sprintf("%vi", value.Value())
|
||||
case bool:
|
||||
pairs = field + "=" + fmt.Sprintf("%v", value.Value())
|
||||
case string:
|
||||
pairs = field + "=" + fmt.Sprintf("%q", models.EscapeStringField(fmt.Sprintf("%s", value.Value())))
|
||||
default:
|
||||
pairs = field + "=" + fmt.Sprintf("%v", value.Value())
|
||||
}
|
||||
fmt.Fprintln(w, string(measurement), pairs, value.UnixNano())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
warnDelete := func() {
|
||||
once.Do(func() {
|
||||
msg := fmt.Sprintf(`WARNING: detected deletes in wal file.
|
||||
Some series for %q may be brought back by replaying this data.
|
||||
To resolve, you can either let the shard snapshot prior to exporting the data
|
||||
or manually editing the exported file.
|
||||
`, key)
|
||||
fmt.Fprintln(cmd.Stderr, msg)
|
||||
})
|
||||
}
|
||||
|
||||
for _, f := range files {
|
||||
if err := write(f); err != nil {
|
||||
if err := cmd.exportWALFile(f, w, warnDelete); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// exportWAL reads every WAL entry from r and exports it to w.
|
||||
func (cmd *Command) exportWALFile(walFilePath string, w io.Writer, warnDelete func()) error {
|
||||
f, err := os.Open(walFilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
r := tsm1.NewWALSegmentReader(f)
|
||||
defer r.Close()
|
||||
|
||||
for r.Next() {
|
||||
entry, err := r.Read()
|
||||
if err != nil {
|
||||
n := r.Count()
|
||||
fmt.Fprintf(cmd.Stderr, "file %s corrupt at position %d", walFilePath, n)
|
||||
break
|
||||
}
|
||||
|
||||
switch t := entry.(type) {
|
||||
case *tsm1.DeleteWALEntry, *tsm1.DeleteRangeWALEntry:
|
||||
warnDelete()
|
||||
continue
|
||||
case *tsm1.WriteWALEntry:
|
||||
var pairs string
|
||||
|
||||
for key, values := range t.Values {
|
||||
measurement, field := tsm1.SeriesAndFieldFromCompositeKey([]byte(key))
|
||||
// measurements are stored escaped, field names are not
|
||||
field = escape.String(field)
|
||||
|
||||
for _, value := range values {
|
||||
if (value.UnixNano() < cmd.startTime) || (value.UnixNano() > cmd.endTime) {
|
||||
continue
|
||||
}
|
||||
|
||||
switch value.Value().(type) {
|
||||
case float64:
|
||||
pairs = field + "=" + fmt.Sprintf("%v", value.Value())
|
||||
case int64:
|
||||
pairs = field + "=" + fmt.Sprintf("%vi", value.Value())
|
||||
case bool:
|
||||
pairs = field + "=" + fmt.Sprintf("%v", value.Value())
|
||||
case string:
|
||||
pairs = field + "=" + fmt.Sprintf("%q", models.EscapeStringField(fmt.Sprintf("%s", value.Value())))
|
||||
default:
|
||||
pairs = field + "=" + fmt.Sprintf("%v", value.Value())
|
||||
}
|
||||
fmt.Fprintln(w, string(measurement), pairs, value.UnixNano())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,3 +1,306 @@
|
|||
package export_test
|
||||
package export
|
||||
|
||||
// #TODO: write some tests
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
|
||||
)
|
||||
|
||||
type corpus map[string][]tsm1.Value
|
||||
|
||||
var (
|
||||
basicCorpus = corpus{
|
||||
tsm1.SeriesFieldKey("floats,k=f", "f"): []tsm1.Value{
|
||||
tsm1.NewValue(1, float64(1.5)),
|
||||
tsm1.NewValue(2, float64(3)),
|
||||
},
|
||||
tsm1.SeriesFieldKey("ints,k=i", "i"): []tsm1.Value{
|
||||
tsm1.NewValue(10, int64(15)),
|
||||
tsm1.NewValue(20, int64(30)),
|
||||
},
|
||||
tsm1.SeriesFieldKey("bools,k=b", "b"): []tsm1.Value{
|
||||
tsm1.NewValue(100, true),
|
||||
tsm1.NewValue(200, false),
|
||||
},
|
||||
tsm1.SeriesFieldKey("strings,k=s", "s"): []tsm1.Value{
|
||||
tsm1.NewValue(1000, "1k"),
|
||||
tsm1.NewValue(2000, "2k"),
|
||||
},
|
||||
}
|
||||
|
||||
basicCorpusExpLines = []string{
|
||||
"floats,k=f f=1.5 1",
|
||||
"floats,k=f f=3 2",
|
||||
"ints,k=i i=15i 10",
|
||||
"ints,k=i i=30i 20",
|
||||
"bools,k=b b=true 100",
|
||||
"bools,k=b b=false 200",
|
||||
`strings,k=s s="1k" 1000`,
|
||||
`strings,k=s s="2k" 2000`,
|
||||
}
|
||||
)
|
||||
|
||||
func Test_exportWALFile(t *testing.T) {
|
||||
walFile := writeCorpusToWALFile(basicCorpus)
|
||||
|
||||
var out bytes.Buffer
|
||||
if err := newCommand().exportWALFile(walFile.Name(), &out, func() {}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
lines := strings.Split(out.String(), "\n")
|
||||
for _, exp := range basicCorpusExpLines {
|
||||
found := false
|
||||
for _, l := range lines {
|
||||
if exp == l {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
t.Fatalf("expected line %q to be in exported output:\n%s", exp, out.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Test_exportTSMFile(t *testing.T) {
|
||||
tsmFile := writeCorpusToTSMFile(basicCorpus)
|
||||
defer os.Remove(tsmFile.Name())
|
||||
|
||||
var out bytes.Buffer
|
||||
if err := newCommand().exportTSMFile(tsmFile.Name(), &out); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
lines := strings.Split(out.String(), "\n")
|
||||
for _, exp := range basicCorpusExpLines {
|
||||
found := false
|
||||
for _, l := range lines {
|
||||
if exp == l {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
t.Fatalf("expected line %q to be in exported output:\n%s", exp, out.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var sink interface{}
|
||||
|
||||
func benchmarkExportTSM(c corpus, b *testing.B) {
|
||||
// Garbage collection is relatively likely to happen during export, so track allocations.
|
||||
b.ReportAllocs()
|
||||
|
||||
f := writeCorpusToTSMFile(c)
|
||||
defer os.Remove(f.Name())
|
||||
|
||||
cmd := newCommand()
|
||||
var out bytes.Buffer
|
||||
b.ResetTimer()
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
if err := cmd.exportTSMFile(f.Name(), &out); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
sink = out.Bytes()
|
||||
out.Reset()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkExportTSMFloats_100s_250vps(b *testing.B) {
|
||||
benchmarkExportTSM(makeFloatsCorpus(100, 250), b)
|
||||
}
|
||||
|
||||
func BenchmarkExportTSMInts_100s_250vps(b *testing.B) {
|
||||
benchmarkExportTSM(makeIntsCorpus(100, 250), b)
|
||||
}
|
||||
|
||||
func BenchmarkExportTSMBools_100s_250vps(b *testing.B) {
|
||||
benchmarkExportTSM(makeBoolsCorpus(100, 250), b)
|
||||
}
|
||||
|
||||
func BenchmarkExportTSMStrings_100s_250vps(b *testing.B) {
|
||||
benchmarkExportTSM(makeStringsCorpus(100, 250), b)
|
||||
}
|
||||
|
||||
func benchmarkExportWAL(c corpus, b *testing.B) {
|
||||
// Garbage collection is relatively likely to happen during export, so track allocations.
|
||||
b.ReportAllocs()
|
||||
|
||||
f := writeCorpusToWALFile(c)
|
||||
defer os.Remove(f.Name())
|
||||
|
||||
cmd := newCommand()
|
||||
var out bytes.Buffer
|
||||
b.ResetTimer()
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
if err := cmd.exportWALFile(f.Name(), &out, func() {}); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
sink = out.Bytes()
|
||||
out.Reset()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkExportWALFloats_100s_250vps(b *testing.B) {
|
||||
benchmarkExportWAL(makeFloatsCorpus(100, 250), b)
|
||||
}
|
||||
|
||||
func BenchmarkExportWALInts_100s_250vps(b *testing.B) {
|
||||
benchmarkExportWAL(makeIntsCorpus(100, 250), b)
|
||||
}
|
||||
|
||||
func BenchmarkExportWALBools_100s_250vps(b *testing.B) {
|
||||
benchmarkExportWAL(makeBoolsCorpus(100, 250), b)
|
||||
}
|
||||
|
||||
func BenchmarkExportWALStrings_100s_250vps(b *testing.B) {
|
||||
benchmarkExportWAL(makeStringsCorpus(100, 250), b)
|
||||
}
|
||||
|
||||
// newCommand returns a command that discards its output and that accepts all timestamps.
|
||||
func newCommand() *Command {
|
||||
return &Command{
|
||||
Stderr: ioutil.Discard,
|
||||
Stdout: ioutil.Discard,
|
||||
startTime: math.MinInt64,
|
||||
endTime: math.MaxInt64,
|
||||
}
|
||||
}
|
||||
|
||||
// makeCorpus returns a new corpus filled with values generated by fn.
|
||||
// The RNG passed to fn is seeded with numSeries * numValuesPerSeries, for predictable output.
|
||||
func makeCorpus(numSeries, numValuesPerSeries int, fn func(*rand.Rand) interface{}) corpus {
|
||||
rng := rand.New(rand.NewSource(int64(numSeries) * int64(numValuesPerSeries)))
|
||||
var unixNano int64
|
||||
corpus := make(corpus, numSeries)
|
||||
for i := 0; i < numSeries; i++ {
|
||||
vals := make([]tsm1.Value, numValuesPerSeries)
|
||||
for j := 0; j < numValuesPerSeries; j++ {
|
||||
vals[j] = tsm1.NewValue(unixNano, fn(rng))
|
||||
unixNano++
|
||||
}
|
||||
|
||||
k := fmt.Sprintf("m,t=%d", i)
|
||||
corpus[tsm1.SeriesFieldKey(k, "x")] = vals
|
||||
}
|
||||
|
||||
return corpus
|
||||
}
|
||||
|
||||
func makeFloatsCorpus(numSeries, numFloatsPerSeries int) corpus {
|
||||
return makeCorpus(numSeries, numFloatsPerSeries, func(rng *rand.Rand) interface{} {
|
||||
return rng.Float64()
|
||||
})
|
||||
}
|
||||
|
||||
func makeIntsCorpus(numSeries, numIntsPerSeries int) corpus {
|
||||
return makeCorpus(numSeries, numIntsPerSeries, func(rng *rand.Rand) interface{} {
|
||||
// This will only return positive integers. That's probably okay.
|
||||
return rng.Int63()
|
||||
})
|
||||
}
|
||||
|
||||
func makeBoolsCorpus(numSeries, numBoolsPerSeries int) corpus {
|
||||
return makeCorpus(numSeries, numBoolsPerSeries, func(rng *rand.Rand) interface{} {
|
||||
return rand.Int63n(2) == 1
|
||||
})
|
||||
}
|
||||
|
||||
func makeStringsCorpus(numSeries, numStringsPerSeries int) corpus {
|
||||
return makeCorpus(numSeries, numStringsPerSeries, func(rng *rand.Rand) interface{} {
|
||||
// The string will randomly have 2-6 parts
|
||||
parts := make([]string, rand.Intn(4)+2)
|
||||
|
||||
for i := range parts {
|
||||
// Each part is a random base36-encoded number
|
||||
parts[i] = strconv.FormatInt(rand.Int63(), 36)
|
||||
}
|
||||
|
||||
// Join the individual parts with underscores.
|
||||
return strings.Join(parts, "_")
|
||||
})
|
||||
}
|
||||
|
||||
// writeCorpusToWALFile writes the given corpus as a WAL file, and returns a handle to that file.
|
||||
// It is the caller's responsibility to remove the returned temp file.
|
||||
// writeCorpusToWALFile will panic on any error that occurs.
|
||||
func writeCorpusToWALFile(c corpus) *os.File {
|
||||
walFile, err := ioutil.TempFile("", "export_test_corpus_wal")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
e := &tsm1.WriteWALEntry{Values: c}
|
||||
b, err := e.Encode(nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
w := tsm1.NewWALSegmentWriter(walFile)
|
||||
if err := w.Write(e.Type(), snappy.Encode(nil, b)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// (*tsm1.WALSegmentWriter).sync isn't exported, but it only Syncs the file anyway.
|
||||
if err := walFile.Sync(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return walFile
|
||||
}
|
||||
|
||||
// writeCorpusToTSMFile writes the given corpus as a TSM file, and returns a handle to that file.
|
||||
// It is the caller's responsibility to remove the returned temp file.
|
||||
// writeCorpusToTSMFile will panic on any error that occurs.
|
||||
func writeCorpusToTSMFile(c corpus) *os.File {
|
||||
tsmFile, err := ioutil.TempFile("", "export_test_corpus_tsm")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
w, err := tsm1.NewTSMWriter(tsmFile)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Write the series in alphabetical order so that each test run is comparable,
|
||||
// given an identical corpus.
|
||||
keys := make([]string, 0, len(c))
|
||||
for k := range c {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
for _, k := range keys {
|
||||
if err := w.Write(k, c[k]); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := w.WriteIndex(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := w.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return tsmFile
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue