Merge pull request #14221 from influxdata/influx-inspect-verify-wal-2.x

feat(storage): Add influxd inspect verify-wal tool
pull/14231/head
Adam Perlin 2019-06-27 16:40:45 -07:00 committed by GitHub
commit 095bfe2d81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 445 additions and 97 deletions

View File

@ -2,6 +2,8 @@
### Features
1. [14221](https://github.com/influxdata/influxdb/pull/14221): Add influxd inspect verify-wal tool
### Bug Fixes
### UI Improvements

View File

@ -1,14 +1,6 @@
package inspect
import (
"errors"
"fmt"
"os"
"path/filepath"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/internal/fs"
"github.com/influxdata/influxdb/tsdb/tsm1"
"github.com/spf13/cobra"
)
@ -19,98 +11,16 @@ func NewCommand() *cobra.Command {
Short: "Commands for inspecting on-disk database data",
}
reportTSMCommand := &cobra.Command{
Use: "report-tsm",
Short: "Run TSM report",
Long: `
This command will analyze TSM files within a storage engine directory, reporting
the cardinality within the files as well as the time range that the point data
covers.
This command only interrogates the index within each file, and does not read any
block data. To reduce heap requirements, by default report-tsm estimates the
overall cardinality in the file set by using the HLL++ algorithm. Exact
cardinalities can be determined by using the --exact flag.
For each file, the following is output:
* The full filename;
* The series cardinality within the file;
* The number of series first encountered within the file;
* The min and max timestamp associated with TSM data in the file; and
* The time taken to load the TSM index and apply any tombstones.
The summary section then outputs the total time range and series cardinality for
the fileset. Depending on the --detailed flag, series cardinality is segmented
in the following ways:
* Series cardinality for each organization;
* Series cardinality for each bucket;
* Series cardinality for each measurement;
* Number of field keys for each measurement; and
* Number of tag values for each tag key.`,
RunE: inspectReportTSMF,
// List of available sub-commands
// If a new sub-command is created, it must be added here
subCommands := []*cobra.Command{
NewReportTSMCommand(),
NewVerifyWALCommand(),
}
reportTSMCommand.Flags().StringVarP(&reportTSMFlags.pattern, "pattern", "", "", "only process TSM files containing pattern")
reportTSMCommand.Flags().BoolVarP(&reportTSMFlags.exact, "exact", "", false, "calculate and exact cardinality count. Warning, may use significant memory...")
reportTSMCommand.Flags().BoolVarP(&reportTSMFlags.detailed, "detailed", "", false, "emit series cardinality segmented by measurements, tag keys and fields. Warning, may take a while.")
reportTSMCommand.Flags().StringVarP(&reportTSMFlags.orgID, "org-id", "", "", "process only data belonging to organization ID.")
reportTSMCommand.Flags().StringVarP(&reportTSMFlags.bucketID, "bucket-id", "", "", "process only data belonging to bucket ID. Requires org flag to be set.")
dir, err := fs.InfluxDir()
if err != nil {
panic(err)
for _, command := range subCommands {
base.AddCommand(command)
}
dir = filepath.Join(dir, "engine/data")
reportTSMCommand.Flags().StringVarP(&reportTSMFlags.dataDir, "data-dir", "", dir, fmt.Sprintf("use provided data directory (defaults to %s).", dir))
base.AddCommand(reportTSMCommand)
return base
}
// reportTSMFlags defines the `report-tsm` Command.
var reportTSMFlags = struct {
pattern string
exact bool
detailed bool
orgID, bucketID string
dataDir string
}{}
// inspectReportTSMF runs the report-tsm tool.
func inspectReportTSMF(cmd *cobra.Command, args []string) error {
report := &tsm1.Report{
Stderr: os.Stderr,
Stdout: os.Stdout,
Dir: reportTSMFlags.dataDir,
Pattern: reportTSMFlags.pattern,
Detailed: reportTSMFlags.detailed,
Exact: reportTSMFlags.exact,
}
if reportTSMFlags.orgID == "" && reportTSMFlags.bucketID != "" {
return errors.New("org-id must be set for non-empty bucket-id")
}
if reportTSMFlags.orgID != "" {
orgID, err := influxdb.IDFromString(reportTSMFlags.orgID)
if err != nil {
return err
}
report.OrgID = orgID
}
if reportTSMFlags.bucketID != "" {
bucketID, err := influxdb.IDFromString(reportTSMFlags.bucketID)
if err != nil {
return err
}
report.BucketID = bucketID
}
_, err := report.Run(true)
return err
}

View File

@ -0,0 +1,109 @@
package inspect
import (
"fmt"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/internal/fs"
"github.com/influxdata/influxdb/kit/errors"
"github.com/influxdata/influxdb/tsdb/tsm1"
"github.com/spf13/cobra"
"os"
"path/filepath"
)
// reportTSMFlags defines the `report-tsm` Command.
var reportTSMFlags = struct {
pattern string
exact bool
detailed bool
orgID, bucketID string
dataDir string
}{}
func NewReportTSMCommand() *cobra.Command {
reportTSMCommand := &cobra.Command{
Use: "report-tsm",
Short: "Run TSM report",
Long: `
This command will analyze TSM files within a storage engine directory, reporting
the cardinality within the files as well as the time range that the point data
covers.
This command only interrogates the index within each file, and does not read any
block data. To reduce heap requirements, by default report-tsm estimates the
overall cardinality in the file set by using the HLL++ algorithm. Exact
cardinalities can be determined by using the --exact flag.
For each file, the following is output:
* The full filename;
* The series cardinality within the file;
* The number of series first encountered within the file;
* The min and max timestamp associated with TSM data in the file; and
* The time taken to load the TSM index and apply any tombstones.
The summary section then outputs the total time range and series cardinality for
the fileset. Depending on the --detailed flag, series cardinality is segmented
in the following ways:
* Series cardinality for each organization;
* Series cardinality for each bucket;
* Series cardinality for each measurement;
* Number of field keys for each measurement; and
* Number of tag values for each tag key.`,
RunE: inspectReportTSMF,
}
reportTSMCommand.Flags().StringVarP(&reportTSMFlags.pattern, "pattern", "", "", "only process TSM files containing pattern")
reportTSMCommand.Flags().BoolVarP(&reportTSMFlags.exact, "exact", "", false, "calculate and exact cardinality count. Warning, may use significant memory...")
reportTSMCommand.Flags().BoolVarP(&reportTSMFlags.detailed, "detailed", "", false, "emit series cardinality segmented by measurements, tag keys and fields. Warning, may take a while.")
reportTSMCommand.Flags().StringVarP(&reportTSMFlags.orgID, "org-id", "", "", "process only data belonging to organization ID.")
reportTSMCommand.Flags().StringVarP(&reportTSMFlags.bucketID, "bucket-id", "", "", "process only data belonging to bucket ID. Requires org flag to be set.")
dir, err := fs.InfluxDir()
if err != nil {
panic(err)
}
dir = filepath.Join(dir, "engine/data")
reportTSMCommand.Flags().StringVarP(&reportTSMFlags.dataDir, "data-dir", "", dir, fmt.Sprintf("use provided data directory (defaults to %s).", dir))
return reportTSMCommand
}
// inspectReportTSMF runs the report-tsm tool.
func inspectReportTSMF(cmd *cobra.Command, args []string) error {
report := &tsm1.Report{
Stderr: os.Stderr,
Stdout: os.Stdout,
Dir: reportTSMFlags.dataDir,
Pattern: reportTSMFlags.pattern,
Detailed: reportTSMFlags.detailed,
Exact: reportTSMFlags.exact,
}
if reportTSMFlags.orgID == "" && reportTSMFlags.bucketID != "" {
return errors.New("org-id must be set for non-empty bucket-id")
}
if reportTSMFlags.orgID != "" {
orgID, err := influxdb.IDFromString(reportTSMFlags.orgID)
if err != nil {
return err
}
report.OrgID = orgID
}
if reportTSMFlags.bucketID != "" {
bucketID, err := influxdb.IDFromString(reportTSMFlags.bucketID)
if err != nil {
return err
}
report.BucketID = bucketID
}
_, err := report.Run(true)
return err
}

View File

@ -0,0 +1,57 @@
package inspect
import (
"fmt"
"github.com/influxdata/influxdb/internal/fs"
"github.com/influxdata/influxdb/storage/wal"
"github.com/spf13/cobra"
"os"
"path/filepath"
)
func NewVerifyWALCommand() *cobra.Command {
verifyWALCommand := &cobra.Command{
Use: `verify-wal`,
Short: "Check for WAL corruption",
Long: `
This command will analyze the WAL (Write-Ahead Log) in a storage directory to
check if there are any corrupt files. If any corrupt files are found, the names
of said corrupt files will be reported. The tool will also count the total number
of entries in the scanned WAL files, in case this is of interest.
For each file, the following is output:
* The file name;
* "clean" (if the file is clean) OR
The first position of any corruption that is found
In the summary section, the following is printed:
* The number of WAL files scanned;
* The number of WAL entries scanned;
* A list of files found to be corrupt`,
RunE: inspectVerifyWAL,
}
dir, err := fs.InfluxDir()
if err != nil {
panic(err)
}
dir = filepath.Join(dir, "engine/wal")
verifyWALCommand.Flags().StringVarP(&verifyWALFlags.dataDir, "data-dir", "", dir, fmt.Sprintf("use provided data directory (defaults to %s).", dir))
return verifyWALCommand
}
var verifyWALFlags = struct {
dataDir string
}{}
// inspectReportTSMF runs the report-tsm tool.
func inspectVerifyWAL(cmd *cobra.Command, args []string) error {
report := &wal.Verifier{
Stderr: os.Stderr,
Stdout: os.Stdout,
Dir: verifyWALFlags.dataDir,
}
_, err := report.Run(true)
return err
}

View File

@ -0,0 +1,165 @@
package wal
import (
"context"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/influxdb/kit/errors"
"github.com/influxdata/influxdb/tsdb/value"
"io/ioutil"
"math/rand"
"os"
"testing"
)
type Test struct {
dir string
corruptFiles []string
}
func TestVerifyWALL_CleanFile(t *testing.T) {
numTestEntries := 100
test := CreateTest(t, func() (string, []string, error) {
dir := MustTempDir()
w := NewWAL(dir)
if err := w.Open(context.Background()); err != nil {
return "", nil, errors.Wrap(err, "error opening wal")
}
for i := 0; i < numTestEntries; i++ {
writeRandomEntry(w, t)
}
if err := w.Close(); err != nil {
return "", nil, errors.Wrap(err, "error closing wal")
}
return dir, []string{}, nil
})
defer test.Close()
verifier := &Verifier{Dir: test.dir}
summary, err := verifier.Run(false)
if err != nil {
t.Fatalf("Unexpected error: %v\n", err)
}
expectedEntries := numTestEntries
if summary.EntryCount != expectedEntries {
t.Fatalf("Error: expected %d entries, checked %d entries", expectedEntries, summary.EntryCount)
}
if summary.CorruptFiles != nil {
t.Fatalf("Error: expected no corrupt files")
}
}
func CreateTest(t *testing.T, createFiles func() (string, []string, error)) *Test {
t.Helper()
dir, corruptFiles, err := createFiles()
if err != nil {
t.Fatal(err)
}
return &Test{
dir: dir,
corruptFiles: corruptFiles,
}
}
func TestVerifyWALL_CorruptFile(t *testing.T) {
test := CreateTest(t, func() (string, []string, error) {
dir := MustTempDir()
f := mustTempWalFile(t, dir)
writeCorruptEntries(f, t, 1)
path := f.Name()
return dir, []string{path}, nil
})
defer test.Close()
verifier := &Verifier{Dir: test.dir}
expectedEntries := 2 // 1 valid entry + 1 corrupt entry
summary, err := verifier.Run(false)
if err != nil {
t.Fatalf("Unexpected error when running wal verification: %v", err)
}
if summary.EntryCount != expectedEntries {
t.Fatalf("Error: expected %d entries, found %d entries", expectedEntries, summary.EntryCount)
}
want := test.corruptFiles
got := summary.CorruptFiles
lessFunc := func(a, b string) bool {return a < b}
if !cmp.Equal(summary.CorruptFiles, want, cmpopts.SortSlices(lessFunc)) {
t.Fatalf("Error: unexpected list of corrupt files %v", cmp.Diff(got, want))
}
}
func writeRandomEntry(w *WAL, t *testing.T) {
if _, err := w.WriteMulti(context.Background(), map[string][]value.Value{
"cpu,host=A#!~#value": {
value.NewValue(rand.Int63(), rand.Float64()),
},
}); err != nil {
t.Fatalf("error writing entry: %v", err)
}
}
func writeCorruptEntries(file *os.File, t *testing.T, n int) {
w := NewWALSegmentWriter(file)
// random byte sequence
corruption := []byte{1, 4, 0, 0, 0}
p1 := value.NewValue(1, 1.1)
values := map[string][]value.Value{
"cpu,host=A#!~#float": {p1},
}
for i := 0; i < n; i++ {
entry := &WriteWALEntry{
Values: values,
}
if err := w.Write(mustMarshalEntry(entry)); err != nil {
fatal(t, "write points", err)
}
if err := w.Flush(); err != nil {
fatal(t, "flush", err)
}
}
// Write some random bytes to the file to simulate corruption.
if _, err := file.Write(corruption); err != nil {
fatal(t, "corrupt WAL segment", err)
}
if err := file.Close(); err != nil {
t.Fatalf("Error: failed to close file: %v\n", err)
}
}
func (t *Test) Close() {
err := os.RemoveAll(t.dir)
if err != nil {
panic(err)
}
}
func mustTempWalFile(t *testing.T, dir string) *os.File {
file, err := ioutil.TempFile(dir, "corrupt*.wal")
if err != nil {
t.Fatal(err)
}
return file
}

105
storage/wal/verify.go Normal file
View File

@ -0,0 +1,105 @@
package wal
import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"text/tabwriter"
"time"
)
type Verifier struct {
Stderr io.Writer
Stdout io.Writer
Dir string
}
type VerificationSummary struct {
EntryCount int
FileCount int
CorruptFiles []string
}
func (v *Verifier) Run(print bool) (*VerificationSummary, error) {
if v.Stderr == nil {
v.Stderr = os.Stderr
}
if v.Stdout == nil {
v.Stdout = os.Stdout
}
if !print {
v.Stderr, v.Stdout = ioutil.Discard, ioutil.Discard
}
dir, err := os.Stat(v.Dir)
if err != nil {
return nil, err
} else if !dir.IsDir() {
return nil, errors.New("invalid data directory")
}
files, err := filepath.Glob(path.Join(v.Dir, "*.wal"))
if err != nil {
panic(err)
}
start := time.Now()
tw := tabwriter.NewWriter(v.Stdout, 8, 2, 1, ' ', 0)
var corruptFiles []string
var entriesScanned int
for _, fpath := range files {
f, err := os.OpenFile(fpath, os.O_RDONLY, 0600)
if err != nil {
fmt.Fprintf(v.Stderr, "error opening file %s: %v. Exiting", fpath, err)
}
clean := true
reader := NewWALSegmentReader(f)
for reader.Next() {
entriesScanned++
_, err := reader.Read()
if err != nil {
clean = false
fmt.Fprintf(tw, "%s: corrupt entry found at position %d\n", fpath, reader.Count())
corruptFiles = append(corruptFiles, fpath)
break
}
}
if clean {
fmt.Fprintf(tw, "%s: clean\n", fpath)
}
}
fmt.Fprintf(tw, "Results:\n")
fmt.Fprintf(tw, " Files checked: %d\n", len(files))
fmt.Fprintf(tw, " Total entries checked: %d\n", entriesScanned)
fmt.Fprintf(tw, " Corrupt files found: ")
if len(corruptFiles) == 0 {
fmt.Fprintf(tw, "None")
} else {
for _, name := range corruptFiles {
fmt.Fprintf(tw, "\n %s", name)
}
}
fmt.Fprintf(tw, "\nCompleted in %v\n", time.Since(start))
summary := &VerificationSummary{
EntryCount: entriesScanned,
CorruptFiles: corruptFiles,
FileCount: len(files),
}
return summary, nil
}