feedback: Move verify routines to `tsm1` package for consistency

Should have left it there to begin with 🤣
pull/14243/head
Stuart Carnie 2019-07-04 09:30:29 +10:00
parent 46952afe37
commit 00561d5a1b
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
6 changed files with 176 additions and 174 deletions

View File

@ -16,6 +16,7 @@ func NewCommand() *cobra.Command {
subCommands := []*cobra.Command{
NewExportBlocksCommand(),
NewReportTSMCommand(),
NewVerifyTSMCommand(),
NewVerifyWALCommand(),
}

View File

@ -0,0 +1,72 @@
package inspect
import (
"fmt"
"os"
"path/filepath"
"github.com/influxdata/influxdb/kit/cli"
"github.com/influxdata/influxdb/tsdb/tsm1"
"github.com/spf13/cobra"
)
// verifyTSMFlags defines the `verify-tsm` Command.
var verifyTSMFlags = struct {
cli.OrgBucket
path string
}{}
func NewVerifyTSMCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "verify-tsm <pathspec>...",
Short: "Checks the consistency of TSM files",
Long: `
This command will analyze a set of TSM files for inconsistencies between the
TSM index and the blocks.
The checks performed by this command are:
* CRC-32 checksums match for each block
* TSM index min and max timestamps match decoded data
OPTIONS
<pathspec>...
A list of files or directories to search for TSM files.
An optional organization or organization and bucket may be specified to limit
the analysis.
`,
RunE: verifyTSMF,
}
verifyTSMFlags.AddFlags(cmd)
return cmd
}
func verifyTSMF(cmd *cobra.Command, args []string) error {
verify := tsm1.VerifyTSM{
Stdout: os.Stdout,
OrgID: verifyTSMFlags.Org.ID,
BucketID: verifyTSMFlags.Bucket.ID,
}
// resolve all pathspecs
for _, arg := range args {
fi, err := os.Stat(arg)
if err != nil {
fmt.Printf("Error processing path %q: %v", arg, err)
continue
}
if fi.IsDir() {
files, _ := filepath.Glob(filepath.Join(arg, "*."+tsm1.TSMFileExtension))
verify.Paths = append(verify.Paths, files...)
} else {
verify.Paths = append(verify.Paths, arg)
}
}
return verify.Run()
}

View File

@ -9,7 +9,6 @@ import (
"github.com/influxdata/influxdb/cmd/influxd/generate"
"github.com/influxdata/influxdb/cmd/influxd/inspect"
"github.com/influxdata/influxdb/cmd/influxd/launcher"
"github.com/influxdata/influxdb/cmd/influxd/verify"
_ "github.com/influxdata/influxdb/query/builtin"
_ "github.com/influxdata/influxdb/tsdb/tsi1"
_ "github.com/influxdata/influxdb/tsdb/tsm1"
@ -38,7 +37,6 @@ func init() {
rootCmd.AddCommand(launcher.NewCommand())
rootCmd.AddCommand(generate.Command)
rootCmd.AddCommand(inspect.NewCommand())
rootCmd.AddCommand(verify.NewCommand())
}
// find determines the default behavior when running influxd.

View File

@ -1,147 +0,0 @@
package verify
import (
"bytes"
"fmt"
"hash/crc32"
"os"
"path/filepath"
"github.com/influxdata/influxdb/kit/cli"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/cursors"
"github.com/influxdata/influxdb/tsdb/tsm1"
"github.com/spf13/cobra"
)
// tsmBlocksFlags defines the `tsm-blocks` Command.
var tsmBlocksFlags = struct {
cli.OrgBucket
path string
}{}
func newTSMBlocksCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "tsm-blocks <pathspec>...",
Short: "Verifies consistency of TSM blocks",
Long: `
This command will analyze a set of TSM files for inconsistencies between the
TSM index and the blocks.
The checks performed by this command are:
* CRC-32 checksums match for each block
* TSM index min and max timestamps match decoded data
OPTIONS
<pathspec>...
A list of files or directories to search for TSM files.
An optional organization or organization and bucket may be specified to limit
the analysis.
`,
Run: verifyTSMBlocks,
}
tsmBlocksFlags.AddFlags(cmd)
return cmd
}
func verifyTSMBlocks(cmd *cobra.Command, args []string) {
for _, arg := range args {
fi, err := os.Stat(arg)
if err != nil {
fmt.Printf("Error processing path %q: %v", arg, err)
continue
}
var files []string
if fi.IsDir() {
files, _ = filepath.Glob(filepath.Join(arg, "*."+tsm1.TSMFileExtension))
} else {
files = append(files, arg)
}
for _, path := range files {
if err := processFile(path); err != nil {
fmt.Printf("Error processing file %q: %v", path, err)
}
}
}
}
func processFile(path string) error {
fmt.Println("processing file: " + path)
file, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
return fmt.Errorf("OpenFile: %v", err)
}
reader, err := tsm1.NewTSMReader(file)
if err != nil {
return fmt.Errorf("failed to create TSM reader for %q: %v", path, err)
}
defer reader.Close()
org, bucket := tsmBlocksFlags.OrgBucketID()
var start []byte
if org.Valid() {
if bucket.Valid() {
v := tsdb.EncodeName(org, bucket)
start = v[:]
} else {
v := tsdb.EncodeOrgName(org)
start = v[:]
}
}
var ts cursors.TimestampArray
count := 0
totalErrors := 0
iter := reader.Iterator(start)
for iter.Next() {
key := iter.Key()
if len(start) > 0 && (len(key) < len(start) || !bytes.Equal(key[:len(start)], start)) {
break
}
entries := iter.Entries()
for i := range entries {
entry := &entries[i]
checksum, buf, err := reader.ReadBytes(entry, nil)
if err != nil {
fmt.Printf("could not read block %d due to error: %q\n", count, err)
count++
continue
}
if expected := crc32.ChecksumIEEE(buf); checksum != expected {
totalErrors++
fmt.Printf("unexpected checksum %d, expected %d for key %v, block %d\n", checksum, expected, key, count)
}
if err = tsm1.DecodeTimestampArrayBlock(buf, &ts); err != nil {
totalErrors++
fmt.Printf("unable to decode timestamps for block %d: %q\n", count, err)
}
if got, exp := entry.MinTime, ts.MinTime(); got != exp {
totalErrors++
fmt.Printf("unexpected min time %d, expected %d for block %d: %q\n", got, exp, count, err)
}
if got, exp := entry.MaxTime, ts.MaxTime(); got != exp {
totalErrors++
fmt.Printf("unexpected max time %d, expected %d for block %d: %q\n", got, exp, count, err)
}
count++
}
}
fmt.Printf("Completed checking %d block(s)\n", count)
return nil
}

View File

@ -1,25 +0,0 @@
package verify
import (
"github.com/spf13/cobra"
)
// NewCommand creates the new command.
func NewCommand() *cobra.Command {
base := &cobra.Command{
Use: "verify",
Short: "Commands for verifying on-disk database data",
}
// List of available sub-commands
// If a new sub-command is created, it must be added here
subCommands := []*cobra.Command{
newTSMBlocksCommand(),
}
for _, command := range subCommands {
base.AddCommand(command)
}
return base
}

103
tsdb/tsm1/verify_tsm.go Normal file
View File

@ -0,0 +1,103 @@
package tsm1
import (
"bytes"
"fmt"
"hash/crc32"
"io"
"os"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/cursors"
)
type VerifyTSM struct {
Stdout io.Writer
Paths []string
OrgID influxdb.ID
BucketID influxdb.ID
}
func (v *VerifyTSM) Run() error {
for _, path := range v.Paths {
if err := v.processFile(path); err != nil {
fmt.Fprintf(v.Stdout, "Error processing file %q: %v", path, err)
}
}
return nil
}
func (v *VerifyTSM) processFile(path string) error {
fmt.Println("processing file: " + path)
file, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
return fmt.Errorf("OpenFile: %v", err)
}
reader, err := NewTSMReader(file)
if err != nil {
return fmt.Errorf("failed to create TSM reader for %q: %v", path, err)
}
defer reader.Close()
var start []byte
if v.OrgID.Valid() {
if v.BucketID.Valid() {
v := tsdb.EncodeName(v.OrgID, v.BucketID)
start = v[:]
} else {
v := tsdb.EncodeOrgName(v.OrgID)
start = v[:]
}
}
var ts cursors.TimestampArray
count := 0
totalErrors := 0
iter := reader.Iterator(start)
for iter.Next() {
key := iter.Key()
if len(start) > 0 && (len(key) < len(start) || !bytes.Equal(key[:len(start)], start)) {
break
}
entries := iter.Entries()
for i := range entries {
entry := &entries[i]
checksum, buf, err := reader.ReadBytes(entry, nil)
if err != nil {
fmt.Fprintf(v.Stdout, "could not read block %d due to error: %q\n", count, err)
count++
continue
}
if expected := crc32.ChecksumIEEE(buf); checksum != expected {
totalErrors++
fmt.Fprintf(v.Stdout, "unexpected checksum %d, expected %d for key %v, block %d\n", checksum, expected, key, count)
}
if err = DecodeTimestampArrayBlock(buf, &ts); err != nil {
totalErrors++
fmt.Fprintf(v.Stdout, "unable to decode timestamps for block %d: %q\n", count, err)
}
if got, exp := entry.MinTime, ts.MinTime(); got != exp {
totalErrors++
fmt.Fprintf(v.Stdout, "unexpected min time %d, expected %d for block %d: %q\n", got, exp, count, err)
}
if got, exp := entry.MaxTime, ts.MaxTime(); got != exp {
totalErrors++
fmt.Fprintf(v.Stdout, "unexpected max time %d, expected %d for block %d: %q\n", got, exp, count, err)
}
count++
}
}
fmt.Fprintf(v.Stdout, "Completed checking %d block(s)\n", count)
return nil
}