Merge pull request #14243 from influxdata/sgc/verify

Add influxd verify tsm-blocks
pull/14301/head
Stuart Carnie 2019-07-09 10:23:56 +10:00 committed by GitHub
commit f0f96ac7fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 244 additions and 0 deletions

View File

@ -16,6 +16,7 @@ func NewCommand() *cobra.Command {
subCommands := []*cobra.Command{
NewExportBlocksCommand(),
NewReportTSMCommand(),
NewVerifyTSMCommand(),
NewVerifyWALCommand(),
}

View File

@ -0,0 +1,72 @@
package inspect
import (
"fmt"
"os"
"path/filepath"
"github.com/influxdata/influxdb/kit/cli"
"github.com/influxdata/influxdb/tsdb/tsm1"
"github.com/spf13/cobra"
)
// verifyTSMFlags defines the `verify-tsm` Command.
var verifyTSMFlags = struct {
cli.OrgBucket
path string
}{}
func NewVerifyTSMCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "verify-tsm <pathspec>...",
Short: "Checks the consistency of TSM files",
Long: `
This command will analyze a set of TSM files for inconsistencies between the
TSM index and the blocks.
The checks performed by this command are:
* CRC-32 checksums match for each block
* TSM index min and max timestamps match decoded data
OPTIONS
<pathspec>...
A list of files or directories to search for TSM files.
An optional organization or organization and bucket may be specified to limit
the analysis.
`,
RunE: verifyTSMF,
}
verifyTSMFlags.AddFlags(cmd)
return cmd
}
func verifyTSMF(cmd *cobra.Command, args []string) error {
verify := tsm1.VerifyTSM{
Stdout: os.Stdout,
OrgID: verifyTSMFlags.Org,
BucketID: verifyTSMFlags.Bucket,
}
// resolve all pathspecs
for _, arg := range args {
fi, err := os.Stat(arg)
if err != nil {
fmt.Printf("Error processing path %q: %v", arg, err)
continue
}
if fi.IsDir() {
files, _ := filepath.Glob(filepath.Join(arg, "*."+tsm1.TSMFileExtension))
verify.Paths = append(verify.Paths, files...)
} else {
verify.Paths = append(verify.Paths, arg)
}
}
return verify.Run()
}

60
kit/cli/idflag.go Normal file
View File

@ -0,0 +1,60 @@
package cli
import (
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)
// Wrapper for influxdb.ID
type idValue influxdb.ID
func newIDValue(val influxdb.ID, p *influxdb.ID) *idValue {
*p = val
return (*idValue)(p)
}
func (i *idValue) String() string { return influxdb.ID(*i).String() }
func (i *idValue) Set(s string) error {
id, err := influxdb.IDFromString(s)
if err != nil {
return err
}
*i = idValue(*id)
return nil
}
func (i *idValue) Type() string {
return "ID"
}
// IDVar defines an influxdb.ID flag with specified name, default value, and usage string.
// The argument p points to an influxdb.ID variable in which to store the value of the flag.
func IDVar(fs *pflag.FlagSet, p *influxdb.ID, name string, value influxdb.ID, usage string) {
IDVarP(fs, p, name, "", value, usage)
}
// IDVarP is like IDVar, but accepts a shorthand letter that can be used after a single dash.
func IDVarP(fs *pflag.FlagSet, p *influxdb.ID, name, shorthand string, value influxdb.ID, usage string) {
fs.VarP(newIDValue(value, p), name, shorthand, usage)
}
type OrgBucket struct {
Org influxdb.ID
Bucket influxdb.ID
}
func (o *OrgBucket) AddFlags(cmd *cobra.Command) {
fs := cmd.Flags()
IDVar(fs, &o.Org, "org-id", influxdb.InvalidID(), "organization id")
IDVar(fs, &o.Bucket, "bucket-id", influxdb.InvalidID(), "bucket id")
}
func (o *OrgBucket) OrgBucketID() (orgID, bucketID influxdb.ID) {
return o.Org, o.Bucket
}
func (o *OrgBucket) Name() [influxdb.IDLength]byte {
return tsdb.EncodeName(o.OrgBucketID())
}

View File

@ -27,6 +27,14 @@ func EncodeName(org, bucket platform.ID) [16]byte {
return nameBytes
}
// EncodeOrgName converts org to the tsdb internal serialization that may be used
// as a prefix when searching for keys matching a specific organization.
func EncodeOrgName(org platform.ID) [8]byte {
var orgBytes [8]byte
binary.BigEndian.PutUint64(orgBytes[0:8], uint64(org))
return orgBytes
}
// EncodeNameString converts org/bucket pairs to the tsdb internal serialization
func EncodeNameString(org, bucket platform.ID) string {
name := EncodeName(org, bucket)

103
tsdb/tsm1/verify_tsm.go Normal file
View File

@ -0,0 +1,103 @@
package tsm1
import (
"bytes"
"fmt"
"hash/crc32"
"io"
"os"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/cursors"
)
type VerifyTSM struct {
Stdout io.Writer
Paths []string
OrgID influxdb.ID
BucketID influxdb.ID
}
func (v *VerifyTSM) Run() error {
for _, path := range v.Paths {
if err := v.processFile(path); err != nil {
fmt.Fprintf(v.Stdout, "Error processing file %q: %v", path, err)
}
}
return nil
}
func (v *VerifyTSM) processFile(path string) error {
fmt.Println("processing file: " + path)
file, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
return fmt.Errorf("OpenFile: %v", err)
}
reader, err := NewTSMReader(file)
if err != nil {
return fmt.Errorf("failed to create TSM reader for %q: %v", path, err)
}
defer reader.Close()
var start []byte
if v.OrgID.Valid() {
if v.BucketID.Valid() {
v := tsdb.EncodeName(v.OrgID, v.BucketID)
start = v[:]
} else {
v := tsdb.EncodeOrgName(v.OrgID)
start = v[:]
}
}
var ts cursors.TimestampArray
count := 0
totalErrors := 0
iter := reader.Iterator(start)
for iter.Next() {
key := iter.Key()
if len(start) > 0 && (len(key) < len(start) || !bytes.Equal(key[:len(start)], start)) {
break
}
entries := iter.Entries()
for i := range entries {
entry := &entries[i]
checksum, buf, err := reader.ReadBytes(entry, nil)
if err != nil {
fmt.Fprintf(v.Stdout, "could not read block %d due to error: %q\n", count, err)
count++
continue
}
if expected := crc32.ChecksumIEEE(buf); checksum != expected {
totalErrors++
fmt.Fprintf(v.Stdout, "unexpected checksum %d, expected %d for key %v, block %d\n", checksum, expected, key, count)
}
if err = DecodeTimestampArrayBlock(buf, &ts); err != nil {
totalErrors++
fmt.Fprintf(v.Stdout, "unable to decode timestamps for block %d: %q\n", count, err)
}
if got, exp := entry.MinTime, ts.MinTime(); got != exp {
totalErrors++
fmt.Fprintf(v.Stdout, "unexpected min time %d, expected %d for block %d: %q\n", got, exp, count, err)
}
if got, exp := entry.MaxTime, ts.MaxTime(); got != exp {
totalErrors++
fmt.Fprintf(v.Stdout, "unexpected max time %d, expected %d for block %d: %q\n", got, exp, count, err)
}
count++
}
}
fmt.Fprintf(v.Stdout, "Completed checking %d block(s)\n", count)
return nil
}