feat(storage): Add influxd inspect dumpwal tool (#14237)

* feat(storage/wal/dump): initial influxd inspect dumptsmwal implementation

* feat(storage/wal/dump): add org bucket formatting to dumpwal tool; improve test cases

* refactor(storage/wal/dump): add long description for dumpstmwal tool

* refactor(storage/wal/dump): rename dumptsmwal flag

* chore(storage/wal/dump): gofmt

* refactor(storage/wal/dump): update error printing in dumptsmwal tool

* refactor(storage/wal/dump): address review comments

* refactor(storage/wal/dump): rename dumpwal command source file

* refactor(storage/wal/dump): clarify print flag comment

* refactor(inspect): remote unnecessary for-loop in influxd inspect command
pull/14789/head
Adam Perlin 2019-08-23 13:05:06 -07:00 committed by GitHub
parent 0cc9caa1d4
commit 76dbc44e3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 611 additions and 3 deletions

View File

@ -0,0 +1,57 @@
package inspect
import (
"os"
"github.com/influxdata/influxdb/kit/errors"
"github.com/influxdata/influxdb/storage/wal"
"github.com/spf13/cobra"
)
var dumpWALFlags = struct {
findDuplicates bool
}{}
func NewDumpWALCommand() *cobra.Command {
dumpTSMWALCommand := &cobra.Command{
Use: "dumpwal",
Short: "Dump TSM data from WAL files",
Long: `
This tool dumps data from WAL files for debugging purposes. Given a list of filepath globs
(patterns which match to .wal file paths), the tool will parse and print out the entries in each file.
It has two modes of operation, depending on the --find-duplicates flag.
--find-duplicates=false (default): for each file, the following is printed:
* The file name
* for each entry,
* The type of the entry (either [write] or [delete-bucket-range]);
* The formatted entry contents
--find-duplicates=true: for each file, the following is printed:
* The file name
* A list of keys in the file that have out of order timestamps
`,
RunE: inspectDumpWAL,
}
dumpTSMWALCommand.Flags().BoolVarP(
&dumpWALFlags.findDuplicates,
"find-duplicates", "", false, "ignore dumping entries; only report keys in the WAL that are out of order")
return dumpTSMWALCommand
}
func inspectDumpWAL(cmd *cobra.Command, args []string) error {
dumper := &wal.Dump{
Stdout: os.Stdout,
Stderr: os.Stderr,
FileGlobs: args,
FindDuplicates: dumpWALFlags.findDuplicates,
}
if len(args) == 0 {
return errors.New("no files provided. aborting")
}
_, err := dumper.Run(true)
return err
}

View File

@ -20,11 +20,10 @@ func NewCommand() *cobra.Command {
NewVerifyWALCommand(),
NewReportTSICommand(),
NewVerifySeriesFileCommand(),
NewDumpWALCommand(),
}
for _, command := range subCommands {
base.AddCommand(command)
}
base.AddCommand(subCommands...)
return base
}

248
storage/wal/dump.go Normal file
View File

@ -0,0 +1,248 @@
package wal
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"text/tabwriter"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/value"
)
// Command represents the program execution for "influxd inspect dumpmwal
// This command will dump all entries from a given list WAL filepath globs
type Dump struct {
// Standard input/output
Stderr io.Writer
Stdout io.Writer
// A list of files to dump
FileGlobs []string
// Whether or not to check for duplicate/out of order entries
FindDuplicates bool
}
type DumpReport struct {
// The file this report corresponds to
File string
// Any keys found to be duplicated/out of order
DuplicateKeys []string
// A list of all the write wal entries from this file
Writes []*WriteWALEntry
// A list of all the delete wal entries from this file
Deletes []*DeleteBucketRangeWALEntry
}
// Run executes the dumpwal command, generating a list of DumpReports
// for each requested file. The `print` flag indicates whether or not
// the command should log output during execution. If the command is run
// as a cli, Run(true) should be used, and if the tool is run programmatically,
// output should likely be suppressed with Run(false).
func (w *Dump) Run(print bool) ([]*DumpReport, error) {
if w.Stderr == nil {
w.Stderr = os.Stderr
}
if w.Stdout == nil {
w.Stdout = os.Stdout
}
if !print {
w.Stdout, w.Stderr = ioutil.Discard, ioutil.Discard
}
twOut := tabwriter.NewWriter(w.Stdout, 8, 2, 1, ' ', 0)
twErr := tabwriter.NewWriter(w.Stderr, 8, 2, 1, ' ', 0)
// Process each WAL file.
paths, err := globAndDedupe(w.FileGlobs)
if err != nil {
return nil, err
}
var reports []*DumpReport
for _, path := range paths {
r, err := w.process(path, twOut, twErr)
if err != nil {
return nil, err
}
reports = append(reports, r)
}
return reports, nil
}
func globAndDedupe(globs []string) ([]string, error) {
files := make(map[string]struct{})
for _, filePattern := range globs {
matches, err := filepath.Glob(filePattern)
if err != nil {
return nil, err
}
for _, match := range matches {
files[match] = struct{}{}
}
}
return sortKeys(files), nil
}
func sortKeys(m map[string]struct{}) []string {
s := make([]string, 0, len(m))
for k := range m {
s = append(s, k)
}
sort.Strings(s)
return s
}
func (w *Dump) process(path string, stdout, stderr io.Writer) (*DumpReport, error) {
if filepath.Ext(path) != "."+WALFileExtension {
fmt.Fprintf(stderr, "invalid wal filename, skipping %s", path)
return nil, fmt.Errorf("invalid wal filename: %s", path)
}
report := &DumpReport{
File: path,
}
fmt.Fprintf(stdout, "File: %s\n", path)
// Track the earliest timestamp for each key and a set of keys with out-of-order points.
minTimestampByKey := make(map[string]int64)
duplicateKeys := make(map[string]struct{})
// Open WAL reader.
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
r := NewWALSegmentReader(f)
// Iterate over the WAL entries
for r.Next() {
entry, err := r.Read()
if err != nil {
fmt.Fprintf(stdout, "Error: cannot read entry: %v ", err)
return nil, fmt.Errorf("cannot read entry: %v", err)
}
switch entry := entry.(type) {
case *WriteWALEntry:
// MarshalSize must always be called to make sure the size of the entry is set
sz := entry.MarshalSize()
if !w.FindDuplicates {
fmt.Fprintf(stdout, "[write] sz=%d\n", sz)
}
report.Writes = append(report.Writes, entry)
keys := make([]string, 0, len(entry.Values))
for k := range entry.Values {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
fmtKey, err := formatKeyOrgBucket(k)
// if key cannot be properly formatted with org and bucket, skip printing
if err != nil {
fmt.Fprintf(stderr, "Invalid key: %v\n", err)
return nil, fmt.Errorf("invalid key: %v", err)
}
for _, v := range entry.Values[k] {
t := v.UnixNano()
// Skip printing if we are only showing duplicate keys.
if w.FindDuplicates {
// Check for duplicate/out of order keys.
if min, ok := minTimestampByKey[k]; ok && t <= min {
duplicateKeys[k] = struct{}{}
}
minTimestampByKey[k] = t
continue
}
switch v := v.(type) {
case value.IntegerValue:
fmt.Fprintf(stdout, "%s %vi %d\n", fmtKey, v.Value(), t)
case value.UnsignedValue:
fmt.Fprintf(stdout, "%s %vu %d\n", fmtKey, v.Value(), t)
case value.FloatValue:
fmt.Fprintf(stdout, "%s %v %d\n", fmtKey, v.Value(), t)
case value.BooleanValue:
fmt.Fprintf(stdout, "%s %v %d\n", fmtKey, v.Value(), t)
case value.StringValue:
fmt.Fprintf(stdout, "%s %q %d\n", fmtKey, v.Value(), t)
default:
fmt.Fprintf(stdout, "%s EMPTY\n", fmtKey)
}
}
}
case *DeleteBucketRangeWALEntry:
bucketID := entry.BucketID.String()
orgID := entry.OrgID.String()
// MarshalSize must always be called to make sure the size of the entry is set
sz := entry.MarshalSize()
if !w.FindDuplicates {
fmt.Fprintf(stdout, "[delete-bucket-range] org=%s bucket=%s min=%d max=%d sz=%d\n", orgID, bucketID, entry.Min, entry.Max, sz)
}
report.Deletes = append(report.Deletes, entry)
default:
return nil, fmt.Errorf("invalid wal entry: %#v", entry)
}
}
// Print keys with duplicate or out-of-order points, if requested.
if w.FindDuplicates {
keys := make([]string, 0, len(duplicateKeys))
for k := range duplicateKeys {
keys = append(keys, k)
}
sort.Strings(keys)
fmt.Fprintln(stdout, "Duplicate/out of order keys:")
for _, k := range keys {
fmtKey, err := formatKeyOrgBucket(k)
// don't print keys that cannot be formatted with org/bucket
if err != nil {
fmt.Fprintf(stderr, "Error: %v\n", err)
continue
}
fmt.Fprintf(stdout, " %s\n", fmtKey)
}
report.DuplicateKeys = keys
}
return report, nil
}
// removes the first 16 bytes of the key, formats as org and bucket id (hex),
// and re-appends to the key so that it can be pretty printed
func formatKeyOrgBucket(key string) (string, error) {
b := []byte(key)
if len(b) < 16 {
return "", fmt.Errorf("key too short to format with org and bucket")
}
var a [16]byte
copy(a[:], b[:16])
org, bucket := tsdb.DecodeName(a)
s := fmt.Sprintf("%s%s", org.String(), bucket.String())
k := s + string(b[16:])
return k, nil
}

304
storage/wal/dump_test.go Normal file
View File

@ -0,0 +1,304 @@
package wal
import (
"bytes"
"encoding/binary"
"fmt"
"io/ioutil"
"os"
"sort"
"testing"
"github.com/influxdata/influxdb/tsdb"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb/value"
)
func TestWalDump_RunWriteEntries(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
file := mustTempWalFile(t, dir)
w := NewWALSegmentWriter(file)
p1 := value.NewValue(1, 1.1)
p2 := value.NewValue(1, int64(1))
p3 := value.NewValue(1, true)
p4 := value.NewValue(1, "string")
p5 := value.NewValue(1, ^uint64(0))
org := influxdb.ID(1)
orgBytes := make([]byte, 8)
binary.BigEndian.PutUint64(orgBytes, uint64(org))
bucket := influxdb.ID(2)
bucketBytes := make([]byte, 8)
binary.BigEndian.PutUint64(bucketBytes, uint64(bucket))
prefix := string(orgBytes) + string(bucketBytes)
values := map[string][]value.Value{
prefix + ",cpu,host=A#!~#float": {p1},
prefix + ",cpu,host=A#!~#int": {p2},
prefix + ",cpu,host=A#!~#bool": {p3},
prefix + ",cpu,host=A#!~#string": {p4},
prefix + ",cpu,host=A#!~#unsigned": {p5},
}
entry := &WriteWALEntry{
Values: values,
}
if err := w.Write(mustMarshalEntry(entry)); err != nil {
fatal(t, "write points", err)
}
if err := w.Flush(); err != nil {
fatal(t, "flush", err)
}
file.Close()
var testOut bytes.Buffer
dump := &Dump{
Stderr: &testOut,
Stdout: &testOut,
FileGlobs: []string{file.Name()},
}
wantOut := fmt.Sprintf(`File: %s
[write] sz=291
00000000000000010000000000000002,cpu,host=A#!~#bool true 1
00000000000000010000000000000002,cpu,host=A#!~#float 1.1 1
00000000000000010000000000000002,cpu,host=A#!~#int 1i 1
00000000000000010000000000000002,cpu,host=A#!~#string "string" 1
00000000000000010000000000000002,cpu,host=A#!~#unsigned 18446744073709551615u 1
`, file.Name())
report, err := dump.Run(true)
if err != nil {
t.Fatal(err)
}
gotOut := testOut.String()
if !cmp.Equal(gotOut, wantOut) {
t.Fatalf("Error: unexpected output: %v", cmp.Diff(gotOut, wantOut))
}
wantReport := []*DumpReport{
{
File: file.Name(),
Writes: []*WriteWALEntry{
entry,
},
},
}
unexported := []interface{}{
value.NewBooleanValue(0, false), value.NewStringValue(0, ""), value.NewIntegerValue(0, 0),
value.NewUnsignedValue(0, 0), value.NewFloatValue(0, 0.0), WriteWALEntry{},
}
if diff := cmp.Diff(report, wantReport, cmp.AllowUnexported(unexported...)); diff != "" {
t.Fatalf("Error: unexpected output: %v", diff)
}
}
func TestWalDumpRun_DeleteRangeEntries(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
file := mustTempWalFile(t, dir)
w := NewWALSegmentWriter(file)
entry := &DeleteBucketRangeWALEntry{
OrgID: influxdb.ID(1),
BucketID: influxdb.ID(2),
Min: 3,
Max: 4,
Predicate: []byte("predicate"),
}
if err := w.Write(mustMarshalEntry(entry)); err != nil {
fatal(t, "write points", err)
}
if err := w.Flush(); err != nil {
fatal(t, "flush", err)
}
var testOut bytes.Buffer
dump := &Dump{
Stderr: &testOut,
Stdout: &testOut,
FileGlobs: []string{file.Name()},
}
name := file.Name()
file.Close()
report, err := dump.Run(true)
if err != nil {
t.Fatal(err)
}
want := fmt.Sprintf(`File: %s
[delete-bucket-range] org=0000000000000001 bucket=0000000000000002 min=3 max=4 sz=57
`, name)
got := testOut.String()
if !cmp.Equal(got, want) {
t.Fatalf("Unexpected output %v", cmp.Diff(got, want))
}
wantReport := []*DumpReport{
{
File: file.Name(),
Deletes: []*DeleteBucketRangeWALEntry{
entry,
},
},
}
unexported := []interface{}{
value.NewBooleanValue(0, false), value.NewStringValue(0, ""), value.NewIntegerValue(0, 0),
value.NewUnsignedValue(0, 0), value.NewFloatValue(0, 0.0), WriteWALEntry{},
}
if diff := cmp.Diff(report, wantReport, cmp.AllowUnexported(unexported...)); diff != "" {
t.Fatalf("Error: unexpected report: %v", diff)
}
}
func TestWalDumpRun_EntriesOutOfOrder(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
file := mustTempWalFile(t, dir)
w := NewWALSegmentWriter(file)
p1 := value.NewValue(1, 1.1)
p2 := value.NewValue(1, int64(1))
p3 := value.NewValue(1, true)
p4 := value.NewValue(1, "string")
p5 := value.NewValue(1, ^uint64(0))
prefix := tsdb.EncodeNameString(influxdb.ID(0xDEAD), influxdb.ID(0xBEEF))
// write duplicate points to the WAL...
values := map[string][]value.Value{
prefix + ",_m=cpu,host=A#!~#float": {p1},
prefix + ",_m=cpu,host=A#!~#int": {p2},
prefix + ",_m=cpu,host=A#!~#bool": {p3},
prefix + ",_m=cpu,host=A#!~#string": {p4},
prefix + ",_m=cpu,host=A#!~#unsigned": {p5},
}
var entries []*WriteWALEntry
for i := 0; i < 2; i++ {
entry := &WriteWALEntry{
Values: values,
}
if err := w.Write(mustMarshalEntry(entry)); err != nil {
t.Fatalf("error writing points: %v", err)
}
if err := w.Flush(); err != nil {
t.Fatalf("error flushing wal: %v", err)
}
entries = append(entries, entry)
}
name := file.Name()
file.Close()
var testOut bytes.Buffer
dump := &Dump{
Stderr: &testOut,
Stdout: &testOut,
FileGlobs: []string{name},
FindDuplicates: true,
}
report, err := dump.Run(true)
if err != nil {
t.Fatal(err)
}
want := []*DumpReport{
{
File: name,
DuplicateKeys: []string{
prefix + ",_m=cpu,host=A#!~#float",
prefix + ",_m=cpu,host=A#!~#int",
prefix + ",_m=cpu,host=A#!~#bool",
prefix + ",_m=cpu,host=A#!~#string",
prefix + ",_m=cpu,host=A#!~#unsigned",
},
Writes: entries,
},
}
wantOut := fmt.Sprintf(`File: %s
Duplicate/out of order keys:
000000000000dead000000000000beef,_m=cpu,host=A#!~#bool
000000000000dead000000000000beef,_m=cpu,host=A#!~#float
000000000000dead000000000000beef,_m=cpu,host=A#!~#int
000000000000dead000000000000beef,_m=cpu,host=A#!~#string
000000000000dead000000000000beef,_m=cpu,host=A#!~#unsigned
`, name)
gotOut := testOut.String()
sortFunc := func(a, b string) bool { return a < b }
unexported := []interface{}{
value.NewBooleanValue(0, false), value.NewStringValue(0, ""), value.NewIntegerValue(0, 0),
value.NewUnsignedValue(0, 0), value.NewFloatValue(0, 0.0), WriteWALEntry{},
}
if diff := cmp.Diff(report, want, cmpopts.SortSlices(sortFunc), cmp.AllowUnexported(unexported...)); diff != "" {
t.Fatalf("Error: unexpected report: %v", diff)
}
if diff := cmp.Diff(gotOut, wantOut); diff != "" {
t.Fatalf("Unexpected output: %v", diff)
}
}
func MustTempFilePattern(dir string, pattern string) *os.File {
f, err := ioutil.TempFile(dir, pattern)
if err != nil {
panic(fmt.Sprintf("failed to create temp file: %v", err))
}
return f
}
func TestGlobAndDedupe(t *testing.T) {
dir := MustTempDir()
file := MustTempFilePattern(dir, "pattern")
file2 := MustTempFilePattern(dir, "pattern")
fmt.Println(dir)
globs := []string{dir + "/*"}
paths, _ := globAndDedupe(globs)
want := []string{file.Name(), file2.Name()}
sort.Strings(want)
if diff := cmp.Diff(paths, want); diff != "" {
t.Fatalf("Unexpected output: %v", diff)
}
globs = append(globs, dir+"/pattern*")
paths, _ = globAndDedupe(globs)
if diff := cmp.Diff(paths, want); diff != "" {
t.Fatalf("Unexpected output: %v", diff)
}
}