From 76dbc44e3cefa2a7e2b61716c87af2cd8145cfa4 Mon Sep 17 00:00:00 2001 From: Adam Perlin Date: Fri, 23 Aug 2019 13:05:06 -0700 Subject: [PATCH] feat(storage): Add influxd inspect dumpwal tool (#14237) * feat(storage/wal/dump): initial influxd inspect dumptsmwal implementation * feat(storage/wal/dump): add org bucket formatting to dumpwal tool; improve test cases * refactor(storage/wal/dump): add long description for dumpstmwal tool * refactor(storage/wal/dump): rename dumptsmwal flag * chore(storage/wal/dump): gofmt * refactor(storage/wal/dump): update error printing in dumptsmwal tool * refactor(storage/wal/dump): address review comments * refactor(storage/wal/dump): rename dumpwal command source file * refactor(storage/wal/dump): clarify print flag comment * refactor(inspect): remote unnecessary for-loop in influxd inspect command --- cmd/influxd/inspect/dump_wal.go | 57 ++++++ cmd/influxd/inspect/inspect.go | 5 +- storage/wal/dump.go | 248 ++++++++++++++++++++++++++ storage/wal/dump_test.go | 304 ++++++++++++++++++++++++++++++++ 4 files changed, 611 insertions(+), 3 deletions(-) create mode 100644 cmd/influxd/inspect/dump_wal.go create mode 100644 storage/wal/dump.go create mode 100644 storage/wal/dump_test.go diff --git a/cmd/influxd/inspect/dump_wal.go b/cmd/influxd/inspect/dump_wal.go new file mode 100644 index 0000000000..b996fd6318 --- /dev/null +++ b/cmd/influxd/inspect/dump_wal.go @@ -0,0 +1,57 @@ +package inspect + +import ( + "os" + + "github.com/influxdata/influxdb/kit/errors" + "github.com/influxdata/influxdb/storage/wal" + "github.com/spf13/cobra" +) + +var dumpWALFlags = struct { + findDuplicates bool +}{} + +func NewDumpWALCommand() *cobra.Command { + dumpTSMWALCommand := &cobra.Command{ + Use: "dumpwal", + Short: "Dump TSM data from WAL files", + Long: ` +This tool dumps data from WAL files for debugging purposes. Given a list of filepath globs +(patterns which match to .wal file paths), the tool will parse and print out the entries in each file. +It has two modes of operation, depending on the --find-duplicates flag. + +--find-duplicates=false (default): for each file, the following is printed: + * The file name + * for each entry, + * The type of the entry (either [write] or [delete-bucket-range]); + * The formatted entry contents +--find-duplicates=true: for each file, the following is printed: + * The file name + * A list of keys in the file that have out of order timestamps +`, + RunE: inspectDumpWAL, + } + + dumpTSMWALCommand.Flags().BoolVarP( + &dumpWALFlags.findDuplicates, + "find-duplicates", "", false, "ignore dumping entries; only report keys in the WAL that are out of order") + + return dumpTSMWALCommand +} + +func inspectDumpWAL(cmd *cobra.Command, args []string) error { + dumper := &wal.Dump{ + Stdout: os.Stdout, + Stderr: os.Stderr, + FileGlobs: args, + FindDuplicates: dumpWALFlags.findDuplicates, + } + + if len(args) == 0 { + return errors.New("no files provided. aborting") + } + + _, err := dumper.Run(true) + return err +} diff --git a/cmd/influxd/inspect/inspect.go b/cmd/influxd/inspect/inspect.go index 6ff52f06ad..0567e425b7 100644 --- a/cmd/influxd/inspect/inspect.go +++ b/cmd/influxd/inspect/inspect.go @@ -20,11 +20,10 @@ func NewCommand() *cobra.Command { NewVerifyWALCommand(), NewReportTSICommand(), NewVerifySeriesFileCommand(), + NewDumpWALCommand(), } - for _, command := range subCommands { - base.AddCommand(command) - } + base.AddCommand(subCommands...) return base } diff --git a/storage/wal/dump.go b/storage/wal/dump.go new file mode 100644 index 0000000000..aa4ae932ff --- /dev/null +++ b/storage/wal/dump.go @@ -0,0 +1,248 @@ +package wal + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "sort" + "text/tabwriter" + + "github.com/influxdata/influxdb/tsdb" + "github.com/influxdata/influxdb/tsdb/value" +) + +// Command represents the program execution for "influxd inspect dumpmwal +// This command will dump all entries from a given list WAL filepath globs + +type Dump struct { + // Standard input/output + Stderr io.Writer + Stdout io.Writer + + // A list of files to dump + FileGlobs []string + + // Whether or not to check for duplicate/out of order entries + FindDuplicates bool +} + +type DumpReport struct { + // The file this report corresponds to + File string + // Any keys found to be duplicated/out of order + DuplicateKeys []string + // A list of all the write wal entries from this file + Writes []*WriteWALEntry + // A list of all the delete wal entries from this file + Deletes []*DeleteBucketRangeWALEntry +} + +// Run executes the dumpwal command, generating a list of DumpReports +// for each requested file. The `print` flag indicates whether or not +// the command should log output during execution. If the command is run +// as a cli, Run(true) should be used, and if the tool is run programmatically, +// output should likely be suppressed with Run(false). +func (w *Dump) Run(print bool) ([]*DumpReport, error) { + if w.Stderr == nil { + w.Stderr = os.Stderr + } + + if w.Stdout == nil { + w.Stdout = os.Stdout + } + + if !print { + w.Stdout, w.Stderr = ioutil.Discard, ioutil.Discard + } + + twOut := tabwriter.NewWriter(w.Stdout, 8, 2, 1, ' ', 0) + twErr := tabwriter.NewWriter(w.Stderr, 8, 2, 1, ' ', 0) + + // Process each WAL file. + paths, err := globAndDedupe(w.FileGlobs) + if err != nil { + return nil, err + } + + var reports []*DumpReport + for _, path := range paths { + r, err := w.process(path, twOut, twErr) + if err != nil { + return nil, err + } + + reports = append(reports, r) + } + + return reports, nil +} + +func globAndDedupe(globs []string) ([]string, error) { + files := make(map[string]struct{}) + for _, filePattern := range globs { + matches, err := filepath.Glob(filePattern) + if err != nil { + return nil, err + } + + for _, match := range matches { + files[match] = struct{}{} + } + } + + return sortKeys(files), nil +} + +func sortKeys(m map[string]struct{}) []string { + s := make([]string, 0, len(m)) + for k := range m { + s = append(s, k) + } + sort.Strings(s) + + return s +} + +func (w *Dump) process(path string, stdout, stderr io.Writer) (*DumpReport, error) { + if filepath.Ext(path) != "."+WALFileExtension { + fmt.Fprintf(stderr, "invalid wal filename, skipping %s", path) + return nil, fmt.Errorf("invalid wal filename: %s", path) + } + + report := &DumpReport{ + File: path, + } + + fmt.Fprintf(stdout, "File: %s\n", path) + + // Track the earliest timestamp for each key and a set of keys with out-of-order points. + minTimestampByKey := make(map[string]int64) + duplicateKeys := make(map[string]struct{}) + + // Open WAL reader. + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer f.Close() + r := NewWALSegmentReader(f) + + // Iterate over the WAL entries + for r.Next() { + entry, err := r.Read() + if err != nil { + fmt.Fprintf(stdout, "Error: cannot read entry: %v ", err) + return nil, fmt.Errorf("cannot read entry: %v", err) + } + + switch entry := entry.(type) { + case *WriteWALEntry: + // MarshalSize must always be called to make sure the size of the entry is set + sz := entry.MarshalSize() + if !w.FindDuplicates { + fmt.Fprintf(stdout, "[write] sz=%d\n", sz) + } + report.Writes = append(report.Writes, entry) + + keys := make([]string, 0, len(entry.Values)) + for k := range entry.Values { + keys = append(keys, k) + } + sort.Strings(keys) + + for _, k := range keys { + fmtKey, err := formatKeyOrgBucket(k) + // if key cannot be properly formatted with org and bucket, skip printing + if err != nil { + fmt.Fprintf(stderr, "Invalid key: %v\n", err) + return nil, fmt.Errorf("invalid key: %v", err) + } + + for _, v := range entry.Values[k] { + t := v.UnixNano() + + // Skip printing if we are only showing duplicate keys. + if w.FindDuplicates { + // Check for duplicate/out of order keys. + if min, ok := minTimestampByKey[k]; ok && t <= min { + duplicateKeys[k] = struct{}{} + } + minTimestampByKey[k] = t + continue + } + + switch v := v.(type) { + case value.IntegerValue: + fmt.Fprintf(stdout, "%s %vi %d\n", fmtKey, v.Value(), t) + case value.UnsignedValue: + fmt.Fprintf(stdout, "%s %vu %d\n", fmtKey, v.Value(), t) + case value.FloatValue: + fmt.Fprintf(stdout, "%s %v %d\n", fmtKey, v.Value(), t) + case value.BooleanValue: + fmt.Fprintf(stdout, "%s %v %d\n", fmtKey, v.Value(), t) + case value.StringValue: + fmt.Fprintf(stdout, "%s %q %d\n", fmtKey, v.Value(), t) + default: + fmt.Fprintf(stdout, "%s EMPTY\n", fmtKey) + } + } + } + case *DeleteBucketRangeWALEntry: + bucketID := entry.BucketID.String() + orgID := entry.OrgID.String() + + // MarshalSize must always be called to make sure the size of the entry is set + sz := entry.MarshalSize() + if !w.FindDuplicates { + fmt.Fprintf(stdout, "[delete-bucket-range] org=%s bucket=%s min=%d max=%d sz=%d\n", orgID, bucketID, entry.Min, entry.Max, sz) + } + report.Deletes = append(report.Deletes, entry) + default: + return nil, fmt.Errorf("invalid wal entry: %#v", entry) + } + } + + // Print keys with duplicate or out-of-order points, if requested. + if w.FindDuplicates { + keys := make([]string, 0, len(duplicateKeys)) + for k := range duplicateKeys { + keys = append(keys, k) + } + sort.Strings(keys) + + fmt.Fprintln(stdout, "Duplicate/out of order keys:") + for _, k := range keys { + fmtKey, err := formatKeyOrgBucket(k) + // don't print keys that cannot be formatted with org/bucket + if err != nil { + fmt.Fprintf(stderr, "Error: %v\n", err) + continue + } + fmt.Fprintf(stdout, " %s\n", fmtKey) + } + report.DuplicateKeys = keys + } + + return report, nil +} + +// removes the first 16 bytes of the key, formats as org and bucket id (hex), +// and re-appends to the key so that it can be pretty printed +func formatKeyOrgBucket(key string) (string, error) { + b := []byte(key) + if len(b) < 16 { + return "", fmt.Errorf("key too short to format with org and bucket") + } + + var a [16]byte + copy(a[:], b[:16]) + + org, bucket := tsdb.DecodeName(a) + + s := fmt.Sprintf("%s%s", org.String(), bucket.String()) + k := s + string(b[16:]) + + return k, nil +} diff --git a/storage/wal/dump_test.go b/storage/wal/dump_test.go new file mode 100644 index 0000000000..6be89fc0d8 --- /dev/null +++ b/storage/wal/dump_test.go @@ -0,0 +1,304 @@ +package wal + +import ( + "bytes" + "encoding/binary" + "fmt" + "io/ioutil" + "os" + "sort" + "testing" + + "github.com/influxdata/influxdb/tsdb" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/tsdb/value" +) + +func TestWalDump_RunWriteEntries(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + file := mustTempWalFile(t, dir) + + w := NewWALSegmentWriter(file) + + p1 := value.NewValue(1, 1.1) + p2 := value.NewValue(1, int64(1)) + p3 := value.NewValue(1, true) + p4 := value.NewValue(1, "string") + p5 := value.NewValue(1, ^uint64(0)) + + org := influxdb.ID(1) + orgBytes := make([]byte, 8) + binary.BigEndian.PutUint64(orgBytes, uint64(org)) + bucket := influxdb.ID(2) + bucketBytes := make([]byte, 8) + binary.BigEndian.PutUint64(bucketBytes, uint64(bucket)) + prefix := string(orgBytes) + string(bucketBytes) + + values := map[string][]value.Value{ + prefix + ",cpu,host=A#!~#float": {p1}, + prefix + ",cpu,host=A#!~#int": {p2}, + prefix + ",cpu,host=A#!~#bool": {p3}, + prefix + ",cpu,host=A#!~#string": {p4}, + prefix + ",cpu,host=A#!~#unsigned": {p5}, + } + + entry := &WriteWALEntry{ + Values: values, + } + + if err := w.Write(mustMarshalEntry(entry)); err != nil { + fatal(t, "write points", err) + } + + if err := w.Flush(); err != nil { + fatal(t, "flush", err) + } + + file.Close() + + var testOut bytes.Buffer + dump := &Dump{ + Stderr: &testOut, + Stdout: &testOut, + FileGlobs: []string{file.Name()}, + } + + wantOut := fmt.Sprintf(`File: %s +[write] sz=291 +00000000000000010000000000000002,cpu,host=A#!~#bool true 1 +00000000000000010000000000000002,cpu,host=A#!~#float 1.1 1 +00000000000000010000000000000002,cpu,host=A#!~#int 1i 1 +00000000000000010000000000000002,cpu,host=A#!~#string "string" 1 +00000000000000010000000000000002,cpu,host=A#!~#unsigned 18446744073709551615u 1 +`, file.Name()) + + report, err := dump.Run(true) + if err != nil { + t.Fatal(err) + } + + gotOut := testOut.String() + + if !cmp.Equal(gotOut, wantOut) { + t.Fatalf("Error: unexpected output: %v", cmp.Diff(gotOut, wantOut)) + } + + wantReport := []*DumpReport{ + { + File: file.Name(), + Writes: []*WriteWALEntry{ + entry, + }, + }, + } + + unexported := []interface{}{ + value.NewBooleanValue(0, false), value.NewStringValue(0, ""), value.NewIntegerValue(0, 0), + value.NewUnsignedValue(0, 0), value.NewFloatValue(0, 0.0), WriteWALEntry{}, + } + + if diff := cmp.Diff(report, wantReport, cmp.AllowUnexported(unexported...)); diff != "" { + t.Fatalf("Error: unexpected output: %v", diff) + } +} + +func TestWalDumpRun_DeleteRangeEntries(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + file := mustTempWalFile(t, dir) + + w := NewWALSegmentWriter(file) + entry := &DeleteBucketRangeWALEntry{ + OrgID: influxdb.ID(1), + BucketID: influxdb.ID(2), + Min: 3, + Max: 4, + Predicate: []byte("predicate"), + } + + if err := w.Write(mustMarshalEntry(entry)); err != nil { + fatal(t, "write points", err) + } + + if err := w.Flush(); err != nil { + fatal(t, "flush", err) + } + + var testOut bytes.Buffer + + dump := &Dump{ + Stderr: &testOut, + Stdout: &testOut, + FileGlobs: []string{file.Name()}, + } + + name := file.Name() + file.Close() + + report, err := dump.Run(true) + + if err != nil { + t.Fatal(err) + } + + want := fmt.Sprintf(`File: %s +[delete-bucket-range] org=0000000000000001 bucket=0000000000000002 min=3 max=4 sz=57 +`, name) + got := testOut.String() + + if !cmp.Equal(got, want) { + t.Fatalf("Unexpected output %v", cmp.Diff(got, want)) + } + + wantReport := []*DumpReport{ + { + File: file.Name(), + Deletes: []*DeleteBucketRangeWALEntry{ + entry, + }, + }, + } + + unexported := []interface{}{ + value.NewBooleanValue(0, false), value.NewStringValue(0, ""), value.NewIntegerValue(0, 0), + value.NewUnsignedValue(0, 0), value.NewFloatValue(0, 0.0), WriteWALEntry{}, + } + if diff := cmp.Diff(report, wantReport, cmp.AllowUnexported(unexported...)); diff != "" { + t.Fatalf("Error: unexpected report: %v", diff) + } + +} + +func TestWalDumpRun_EntriesOutOfOrder(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + file := mustTempWalFile(t, dir) + + w := NewWALSegmentWriter(file) + + p1 := value.NewValue(1, 1.1) + p2 := value.NewValue(1, int64(1)) + p3 := value.NewValue(1, true) + p4 := value.NewValue(1, "string") + p5 := value.NewValue(1, ^uint64(0)) + + prefix := tsdb.EncodeNameString(influxdb.ID(0xDEAD), influxdb.ID(0xBEEF)) + + // write duplicate points to the WAL... + values := map[string][]value.Value{ + prefix + ",_m=cpu,host=A#!~#float": {p1}, + prefix + ",_m=cpu,host=A#!~#int": {p2}, + prefix + ",_m=cpu,host=A#!~#bool": {p3}, + prefix + ",_m=cpu,host=A#!~#string": {p4}, + prefix + ",_m=cpu,host=A#!~#unsigned": {p5}, + } + + var entries []*WriteWALEntry + + for i := 0; i < 2; i++ { + entry := &WriteWALEntry{ + Values: values, + } + if err := w.Write(mustMarshalEntry(entry)); err != nil { + t.Fatalf("error writing points: %v", err) + } + + if err := w.Flush(); err != nil { + t.Fatalf("error flushing wal: %v", err) + } + entries = append(entries, entry) + } + + name := file.Name() + file.Close() + + var testOut bytes.Buffer + dump := &Dump{ + Stderr: &testOut, + Stdout: &testOut, + FileGlobs: []string{name}, + FindDuplicates: true, + } + + report, err := dump.Run(true) + if err != nil { + t.Fatal(err) + } + + want := []*DumpReport{ + { + File: name, + DuplicateKeys: []string{ + prefix + ",_m=cpu,host=A#!~#float", + prefix + ",_m=cpu,host=A#!~#int", + prefix + ",_m=cpu,host=A#!~#bool", + prefix + ",_m=cpu,host=A#!~#string", + prefix + ",_m=cpu,host=A#!~#unsigned", + }, + Writes: entries, + }, + } + + wantOut := fmt.Sprintf(`File: %s +Duplicate/out of order keys: + 000000000000dead000000000000beef,_m=cpu,host=A#!~#bool + 000000000000dead000000000000beef,_m=cpu,host=A#!~#float + 000000000000dead000000000000beef,_m=cpu,host=A#!~#int + 000000000000dead000000000000beef,_m=cpu,host=A#!~#string + 000000000000dead000000000000beef,_m=cpu,host=A#!~#unsigned +`, name) + + gotOut := testOut.String() + + sortFunc := func(a, b string) bool { return a < b } + + unexported := []interface{}{ + value.NewBooleanValue(0, false), value.NewStringValue(0, ""), value.NewIntegerValue(0, 0), + value.NewUnsignedValue(0, 0), value.NewFloatValue(0, 0.0), WriteWALEntry{}, + } + + if diff := cmp.Diff(report, want, cmpopts.SortSlices(sortFunc), cmp.AllowUnexported(unexported...)); diff != "" { + t.Fatalf("Error: unexpected report: %v", diff) + } + + if diff := cmp.Diff(gotOut, wantOut); diff != "" { + t.Fatalf("Unexpected output: %v", diff) + } +} + +func MustTempFilePattern(dir string, pattern string) *os.File { + f, err := ioutil.TempFile(dir, pattern) + if err != nil { + panic(fmt.Sprintf("failed to create temp file: %v", err)) + } + return f +} + +func TestGlobAndDedupe(t *testing.T) { + dir := MustTempDir() + file := MustTempFilePattern(dir, "pattern") + file2 := MustTempFilePattern(dir, "pattern") + + fmt.Println(dir) + globs := []string{dir + "/*"} + paths, _ := globAndDedupe(globs) + want := []string{file.Name(), file2.Name()} + sort.Strings(want) + + if diff := cmp.Diff(paths, want); diff != "" { + t.Fatalf("Unexpected output: %v", diff) + } + + globs = append(globs, dir+"/pattern*") + paths, _ = globAndDedupe(globs) + + if diff := cmp.Diff(paths, want); diff != "" { + t.Fatalf("Unexpected output: %v", diff) + } + +}