refactor verify_seriesfile
- some style changes - export and make usable the verify functions - make smaller functions for verification - change the flags to be more intuitivepull/9770/head
parent
9f09a3c1c2
commit
3e7a01b1db
|
@ -39,7 +39,7 @@ The commands are:
|
|||
help display this help message
|
||||
report displays a shard level report
|
||||
verify verifies integrity of TSM files
|
||||
verify-seriesfile verifies integrity of series files
|
||||
verify-seriesfile verifies integrity of the Series File
|
||||
|
||||
"help" is the default command.
|
||||
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
// Package verify_seriesfile verifies integrity of series files.
|
||||
package verify_seriesfile
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/influxdata/influxdb/logger"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
// Command represents the program execution for "influx_inspect verify-seriesfile".
|
||||
type Command struct {
|
||||
Stdout io.Writer
|
||||
Stderr io.Writer
|
||||
}
|
||||
|
||||
// NewCommand returns a new instance of Command.
|
||||
func NewCommand() *Command {
|
||||
return &Command{
|
||||
Stdout: os.Stdout,
|
||||
Stderr: os.Stderr,
|
||||
}
|
||||
}
|
||||
|
||||
// Run executes the command.
|
||||
func (cmd *Command) Run(args ...string) error {
|
||||
fs := flag.NewFlagSet("verify-seriesfile", flag.ExitOnError)
|
||||
fs.SetOutput(cmd.Stdout)
|
||||
|
||||
dataDir := fs.String("dir", filepath.Join(os.Getenv("HOME"), ".influxdb", "data"), "Data directory.")
|
||||
dbName := fs.String("db", "", "Only use this database inside of the data directory.")
|
||||
seriesFile := fs.String("series-file", "", "Path to a series file. This overrides -db and -dir.")
|
||||
verbose := fs.Bool("v", false, "Verbose output.")
|
||||
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
config := logger.NewConfig()
|
||||
if *verbose {
|
||||
config.Level = zapcore.DebugLevel
|
||||
}
|
||||
logger, err := config.New(cmd.Stderr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if *seriesFile != "" {
|
||||
_, err := VerifySeriesFile(logger, *seriesFile)
|
||||
return err
|
||||
}
|
||||
|
||||
if *dbName != "" {
|
||||
_, err := VerifySeriesFile(logger, filepath.Join(*dataDir, *dbName, "_series"))
|
||||
return err
|
||||
}
|
||||
|
||||
dbs, err := ioutil.ReadDir(*dataDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, db := range dbs {
|
||||
if !db.IsDir() {
|
||||
continue
|
||||
}
|
||||
_, err := VerifySeriesFile(logger, filepath.Join(*dataDir, db.Name(), "_series"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,317 @@
|
|||
package verify_seriesfile
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// VerifySeriesFile performs verifications on a series file. The error is only returned
|
||||
// if there was some fatal problem with operating, not if there was a problem with the series file.
|
||||
func VerifySeriesFile(logger *zap.Logger, filePath string) (valid bool, err error) {
|
||||
logger = logger.With(zap.String("path", filePath))
|
||||
logger.Debug("Verifying series file")
|
||||
|
||||
defer func() {
|
||||
if rec := recover(); rec != nil {
|
||||
logger.Error("Panic verifying file", zap.String("recovered", fmt.Sprint(rec)))
|
||||
valid = false
|
||||
}
|
||||
}()
|
||||
|
||||
partitionInfos, err := ioutil.ReadDir(filePath)
|
||||
if os.IsNotExist(err) {
|
||||
logger.Error("Series file does not exist")
|
||||
return false, nil
|
||||
}
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// check every partition
|
||||
for _, partitionInfo := range partitionInfos {
|
||||
partitionPath := filepath.Join(filePath, partitionInfo.Name())
|
||||
if valid, err := VerifyPartition(logger, partitionPath); err != nil {
|
||||
return false, err
|
||||
} else if !valid {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// VerifyPartition performs verifications on a parition of a series file. The error is only returned
|
||||
// if there was some fatal problem with operating, not if there was a problem with the partition.
|
||||
func VerifyPartition(logger *zap.Logger, partitionPath string) (valid bool, err error) {
|
||||
logger = logger.With(zap.String("partition", filepath.Base(partitionPath)))
|
||||
logger.Debug("Verifying partition")
|
||||
|
||||
defer func() {
|
||||
if rec := recover(); rec != nil {
|
||||
logger.Error("Panic verifying partition", zap.String("recovered", fmt.Sprint(rec)))
|
||||
valid = false
|
||||
}
|
||||
}()
|
||||
|
||||
segmentInfos, err := ioutil.ReadDir(partitionPath)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
segments := make([]*tsdb.SeriesSegment, 0, len(segmentInfos))
|
||||
ids := make(map[uint64]IDData)
|
||||
|
||||
// check every segment
|
||||
for _, segmentInfo := range segmentInfos {
|
||||
segmentPath := filepath.Join(partitionPath, segmentInfo.Name())
|
||||
segmentID, err := tsdb.ParseSeriesSegmentFilename(segmentInfo.Name())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if valid, err := VerifySegment(logger, segmentPath, ids); err != nil {
|
||||
return false, err
|
||||
} else if !valid {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// open the segment for verifying the index. we want it to be open outside
|
||||
// the for loop as well, so the defer is ok.
|
||||
segment := tsdb.NewSeriesSegment(segmentID, segmentPath)
|
||||
if err := segment.Open(); err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer segment.Close()
|
||||
|
||||
segments = append(segments, segment)
|
||||
}
|
||||
|
||||
// check the index
|
||||
indexPath := filepath.Join(partitionPath, "index")
|
||||
if valid, err := VerifyIndex(logger, indexPath, segments, ids); err != nil {
|
||||
return false, err
|
||||
} else if !valid {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// IDData keeps track of data about a series ID.
|
||||
type IDData struct {
|
||||
Offset int64
|
||||
Key []byte
|
||||
Deleted bool
|
||||
}
|
||||
|
||||
// VerifySegment performs verifications on a segment of a series file. The error is only returned
|
||||
// if there was some fatal problem with operating, not if there was a problem with the partition.
|
||||
// The ids map is populated with information about the ids stored in the segment.
|
||||
func VerifySegment(logger *zap.Logger, segmentPath string, ids map[uint64]IDData) (
|
||||
valid bool, err error) {
|
||||
|
||||
segmentName := filepath.Base(segmentPath)
|
||||
logger = logger.With(zap.String("segment", segmentName))
|
||||
logger.Debug("Verifying segment")
|
||||
|
||||
// Open up the segment and grab it's data.
|
||||
segmentID, err := tsdb.ParseSeriesSegmentFilename(segmentName)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
segment := tsdb.NewSeriesSegment(segmentID, segmentPath)
|
||||
if err := segment.Open(); err != nil {
|
||||
logger.Error("Error opening segment", zap.Error(err))
|
||||
}
|
||||
defer segment.Close()
|
||||
buf := newBuffer(segment.Data())
|
||||
|
||||
defer func() {
|
||||
if rec := recover(); rec != nil {
|
||||
logger.Error("Panic verifying segment",
|
||||
zap.String("recovered", fmt.Sprint(rec)),
|
||||
zap.Int64("offset", buf.offset))
|
||||
valid = false
|
||||
}
|
||||
}()
|
||||
|
||||
// Skip the header: it has already been verified by the Open call.
|
||||
if err := buf.advance(tsdb.SeriesSegmentHeaderSize); err != nil {
|
||||
logger.Error("Unable to advance buffer",
|
||||
zap.Int64("offset", buf.offset),
|
||||
zap.Error(err))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
prevID, firstID := uint64(0), true
|
||||
|
||||
entries:
|
||||
for len(buf.data) > 0 {
|
||||
flag, id, key, sz := tsdb.ReadSeriesEntry(buf.data)
|
||||
|
||||
// Check the flag is valid and for id monotonicity.
|
||||
switch flag {
|
||||
case tsdb.SeriesEntryInsertFlag:
|
||||
if !firstID && prevID > id {
|
||||
logger.Error("ID is not monotonic",
|
||||
zap.Uint64("prev_id", prevID),
|
||||
zap.Uint64("id", id),
|
||||
zap.Int64("offset", buf.offset))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
firstID = false
|
||||
prevID = id
|
||||
|
||||
if ids != nil {
|
||||
ids[id] = IDData{
|
||||
Offset: tsdb.JoinSeriesOffset(segment.ID(), uint32(buf.offset)),
|
||||
Key: append([]byte(nil), key...),
|
||||
}
|
||||
}
|
||||
|
||||
case tsdb.SeriesEntryTombstoneFlag:
|
||||
if ids != nil {
|
||||
data := ids[id]
|
||||
data.Deleted = true
|
||||
ids[id] = data
|
||||
}
|
||||
|
||||
case 0: // if zero, there are no more entries
|
||||
if err := buf.advance(sz); err != nil {
|
||||
logger.Error("Unable to advance buffer",
|
||||
zap.Int64("offset", buf.offset),
|
||||
zap.Error(err))
|
||||
return false, nil
|
||||
}
|
||||
break entries
|
||||
|
||||
default:
|
||||
logger.Error("Invalid flag",
|
||||
zap.Uint8("flag", flag),
|
||||
zap.Int64("offset", buf.offset))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Ensure the key parses. This may panic, but our defer handler should
|
||||
// make the error message more usable by providing the key.
|
||||
parsed := false
|
||||
func() {
|
||||
defer func() {
|
||||
if rec := recover(); rec != nil {
|
||||
logger.Error("Panic parsing key",
|
||||
zap.String("key", fmt.Sprintf("%x", key)),
|
||||
zap.Int64("offset", buf.offset),
|
||||
zap.String("recovered", fmt.Sprint(rec)))
|
||||
}
|
||||
}()
|
||||
tsdb.ParseSeriesKey(key)
|
||||
parsed = true
|
||||
}()
|
||||
if !parsed {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Advance past the entry.
|
||||
if err := buf.advance(sz); err != nil {
|
||||
logger.Error("Unable to advance buffer",
|
||||
zap.Int64("offset", buf.offset),
|
||||
zap.Error(err))
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// VerifyIndex performs verification on an index in a series file. The error is only returned
|
||||
// if there was some fatal problem with operating, not if there was a problem with the partition.
|
||||
// The ids map must be built from verifying the passed in segments.
|
||||
func VerifyIndex(logger *zap.Logger, indexPath string, segments []*tsdb.SeriesSegment,
|
||||
ids map[uint64]IDData) (valid bool, err error) {
|
||||
|
||||
logger.Debug("Verifying index")
|
||||
|
||||
index := tsdb.NewSeriesIndex(indexPath)
|
||||
if err := index.Open(); err != nil {
|
||||
logger.Error("Error opening index", zap.Error(err))
|
||||
return false, nil
|
||||
}
|
||||
defer index.Close()
|
||||
|
||||
if err := index.Recover(segments); err != nil {
|
||||
logger.Error("Error recovering index", zap.Error(err))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// we check all the ids in a consistent order to get the same errors if
|
||||
// there is a problem
|
||||
idsList := make([]uint64, 0, len(ids))
|
||||
for id := range ids {
|
||||
idsList = append(idsList, id)
|
||||
}
|
||||
sort.Slice(idsList, func(i, j int) bool {
|
||||
return idsList[i] < idsList[j]
|
||||
})
|
||||
|
||||
for _, id := range idsList {
|
||||
IDData := ids[id]
|
||||
|
||||
expectedOffset, expectedID := IDData.Offset, id
|
||||
if IDData.Deleted {
|
||||
expectedOffset, expectedID = 0, 0
|
||||
}
|
||||
|
||||
// check both that the offset is right and that we get the right
|
||||
// id for the key
|
||||
|
||||
if gotOffset := index.FindOffsetByID(id); gotOffset != expectedOffset {
|
||||
logger.Error("Index inconsistency",
|
||||
zap.Uint64("id", id),
|
||||
zap.Int64("got_offset", gotOffset),
|
||||
zap.Int64("expected_offset", expectedOffset))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if gotID := index.FindIDBySeriesKey(segments, IDData.Key); gotID != expectedID {
|
||||
logger.Error("Index inconsistency",
|
||||
zap.Uint64("id", id),
|
||||
zap.Uint64("got_id", gotID),
|
||||
zap.Uint64("expected_id", expectedID))
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// buffer allows one to safely advance a byte slice and keep track of how many bytes were advanced.
|
||||
type buffer struct {
|
||||
offset int64
|
||||
data []byte
|
||||
}
|
||||
|
||||
// newBuffer constructs a buffer with the provided data.
|
||||
func newBuffer(data []byte) *buffer {
|
||||
return &buffer{
|
||||
offset: 0,
|
||||
data: data,
|
||||
}
|
||||
}
|
||||
|
||||
// advance will consume n bytes from the data slice and return an error if there is not enough
|
||||
// data to do so.
|
||||
func (b *buffer) advance(n int64) error {
|
||||
if int64(len(b.data)) < n {
|
||||
return fmt.Errorf("unable to advance %d bytes: %d remaining", n, len(b.data))
|
||||
}
|
||||
b.data = b.data[n:]
|
||||
b.offset += n
|
||||
return nil
|
||||
}
|
|
@ -1,298 +0,0 @@
|
|||
// Package verify_seriesfile verifies integrity of series files.
|
||||
package verify_seriesfile
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
|
||||
"github.com/influxdata/influxdb/logger"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Command represents the program execution for "influx_inspect verify-seriesfile".
|
||||
type Command struct {
|
||||
Stderr io.Writer
|
||||
Stdout io.Writer
|
||||
}
|
||||
|
||||
// NewCommand returns a new instance of Command.
|
||||
func NewCommand() *Command {
|
||||
return &Command{
|
||||
Stderr: os.Stderr,
|
||||
Stdout: os.Stdout,
|
||||
}
|
||||
}
|
||||
|
||||
// Run executes the command.
|
||||
func (cmd *Command) Run(args ...string) error {
|
||||
fs := flag.NewFlagSet("verify-seriesfile", flag.ExitOnError)
|
||||
fs.SetOutput(cmd.Stdout)
|
||||
|
||||
dataDir := fs.String("dir", os.Getenv("HOME")+"/.influxdb", "data directory.")
|
||||
dbPath := fs.String("db", "", "database path. overrides data directory.")
|
||||
filePath := fs.String("file", "", "series file path. overrides db and data directory.")
|
||||
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return cmd.run(*dataDir, *dbPath, *filePath)
|
||||
}
|
||||
|
||||
// run performs all of the validations, preferring a filePath, then a dbPath,
|
||||
// and then a dataDir.
|
||||
func (cmd *Command) run(dataDir, dbPath, filePath string) error {
|
||||
if filePath != "" {
|
||||
_, err := cmd.runFile(filePath)
|
||||
return err
|
||||
}
|
||||
if dbPath != "" {
|
||||
_, err := cmd.runFile(filepath.Join(dbPath, "_series"))
|
||||
return err
|
||||
}
|
||||
|
||||
dbs, err := ioutil.ReadDir(filepath.Join(dataDir, "data"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, db := range dbs {
|
||||
if !db.IsDir() {
|
||||
continue
|
||||
}
|
||||
_, err := cmd.runFile(filepath.Join(dataDir, "data", db.Name(), "_series"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// runFile performs validations on a series file. The error is only returned if
|
||||
// there was some fatal problem with operating, not if there was a problem with
|
||||
// the series file.
|
||||
func (cmd *Command) runFile(filePath string) (valid bool, err error) {
|
||||
logger := logger.New(cmd.Stderr).With(zap.String("path", filePath))
|
||||
logger.Info("starting validation")
|
||||
|
||||
defer func() {
|
||||
if rec := recover(); rec != nil {
|
||||
logger.Error("panic validating file",
|
||||
zap.String("recovered", fmt.Sprint(rec)))
|
||||
valid = false
|
||||
}
|
||||
}()
|
||||
|
||||
partitionInfos, err := ioutil.ReadDir(filePath)
|
||||
if os.IsNotExist(err) {
|
||||
logger.Error("series file does not exist")
|
||||
return false, nil
|
||||
}
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// check every partition
|
||||
for _, partitionInfo := range partitionInfos {
|
||||
pLogger := logger.With(zap.String("partition", partitionInfo.Name()))
|
||||
pLogger.Info("validating partition")
|
||||
partitionPath := filepath.Join(filePath, partitionInfo.Name())
|
||||
|
||||
segmentInfos, err := ioutil.ReadDir(partitionPath)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
segments := make([]*tsdb.SeriesSegment, 0, len(segmentInfos))
|
||||
ids := make(map[uint64]idData)
|
||||
|
||||
// check every segment
|
||||
for _, segmentInfo := range segmentInfos {
|
||||
segmentPath := filepath.Join(partitionPath, segmentInfo.Name())
|
||||
|
||||
segmentID, err := tsdb.ParseSeriesSegmentFilename(segmentInfo.Name())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
sLogger := pLogger.With(zap.String("segment", segmentInfo.Name()))
|
||||
sLogger.Info("validating segment")
|
||||
|
||||
segment := tsdb.NewSeriesSegment(segmentID, segmentPath)
|
||||
if err := segment.Open(); err != nil {
|
||||
sLogger.Error("opening segment", zap.Error(err))
|
||||
return false, nil
|
||||
}
|
||||
defer segment.Close()
|
||||
|
||||
if offset, err := cmd.validateSegment(segment, ids); err != nil {
|
||||
sLogger.Error("iterating over segment",
|
||||
zap.Int64("offset", offset),
|
||||
zap.Error(err))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
segments = append(segments, segment)
|
||||
}
|
||||
|
||||
// validate the index if it exists
|
||||
indexPath := filepath.Join(partitionPath, "index")
|
||||
_, err = os.Stat(indexPath)
|
||||
if os.IsNotExist(err) {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
pLogger.Info("validating index")
|
||||
|
||||
index := tsdb.NewSeriesIndex(indexPath)
|
||||
if err := index.Open(); err != nil {
|
||||
pLogger.Error("opening index", zap.Error(err))
|
||||
return false, nil
|
||||
}
|
||||
defer index.Close()
|
||||
|
||||
if err := index.Recover(segments); err != nil {
|
||||
pLogger.Error("recovering index", zap.Error(err))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// we check all the ids in a consistent order to get the same errors if
|
||||
// there is a problem
|
||||
idsList := make([]uint64, 0, len(ids))
|
||||
for id := range ids {
|
||||
idsList = append(idsList, id)
|
||||
}
|
||||
sort.Slice(idsList, func(i, j int) bool {
|
||||
return idsList[i] < idsList[j]
|
||||
})
|
||||
|
||||
for _, id := range idsList {
|
||||
idData := ids[id]
|
||||
idLogger := pLogger.With(zap.Uint64("id", id))
|
||||
|
||||
expectedOffset, expectedID := idData.offset, id
|
||||
if idData.deleted {
|
||||
expectedOffset, expectedID = 0, 0
|
||||
}
|
||||
|
||||
// check both that the offset is right and that we get the right
|
||||
// id for the key
|
||||
|
||||
if gotOffset := index.FindOffsetByID(id); gotOffset != expectedOffset {
|
||||
idLogger.Error("index inconsistency",
|
||||
zap.Int64("got_offset", gotOffset),
|
||||
zap.Int64("expected_offset", expectedOffset))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if gotID := index.FindIDBySeriesKey(segments, idData.key); gotID != expectedID {
|
||||
idLogger.Error("index inconsistency",
|
||||
zap.Uint64("got_id", gotID),
|
||||
zap.Uint64("expected_id", expectedID))
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.Info("validation passed")
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// idData keeps track of data about a series ID.
|
||||
type idData struct {
|
||||
offset int64
|
||||
key []byte
|
||||
deleted bool
|
||||
}
|
||||
|
||||
// validateSegment checks that all of the entries in the segment are well formed. If there
|
||||
// is any error, the offset at which it happened is returned.
|
||||
func (cmd *Command) validateSegment(segment *tsdb.SeriesSegment, ids map[uint64]idData) (offset int64, err error) {
|
||||
defer func() {
|
||||
if rec := recover(); rec != nil {
|
||||
err = fmt.Errorf("panic validating segment: %v", rec)
|
||||
}
|
||||
}()
|
||||
|
||||
data := segment.Data()
|
||||
|
||||
advance := func(n int64) error {
|
||||
if int64(len(data)) < n {
|
||||
return fmt.Errorf("unable to advance %d bytes: %d remaining", n, len(data))
|
||||
}
|
||||
offset += n
|
||||
data = data[n:]
|
||||
return nil
|
||||
}
|
||||
|
||||
// Skip the header: it has already been validated by the Open call.
|
||||
if err := advance(tsdb.SeriesSegmentHeaderSize); err != nil {
|
||||
return offset, err
|
||||
}
|
||||
|
||||
prevID, firstID := uint64(0), true
|
||||
|
||||
entries:
|
||||
for len(data) > 0 {
|
||||
flag, id, key, sz := tsdb.ReadSeriesEntry(data)
|
||||
|
||||
// check the flag is valid and for id monotonicity
|
||||
switch flag {
|
||||
case tsdb.SeriesEntryInsertFlag:
|
||||
if !firstID && prevID > id {
|
||||
return offset, fmt.Errorf("id is not monotonic: %d and then %d", prevID, id)
|
||||
}
|
||||
|
||||
firstID = false
|
||||
prevID = id
|
||||
ids[id] = idData{
|
||||
offset: tsdb.JoinSeriesOffset(segment.ID(), uint32(offset)),
|
||||
key: key,
|
||||
}
|
||||
|
||||
case tsdb.SeriesEntryTombstoneFlag:
|
||||
data := ids[id]
|
||||
data.deleted = true
|
||||
ids[id] = data
|
||||
|
||||
case 0: // if zero, there are no more entries
|
||||
if err := advance(sz); err != nil {
|
||||
return offset, err
|
||||
}
|
||||
break entries
|
||||
|
||||
default:
|
||||
return offset, fmt.Errorf("invalid flag: %d", flag)
|
||||
}
|
||||
|
||||
// ensure the key parses. this may panic, but our defer handler should
|
||||
// make the error message more usable.
|
||||
func() {
|
||||
defer func() {
|
||||
if rec := recover(); rec != nil {
|
||||
err = fmt.Errorf("panic parsing key: %x", key)
|
||||
}
|
||||
}()
|
||||
tsdb.ParseSeriesKey(key)
|
||||
}()
|
||||
if err != nil {
|
||||
return offset, err
|
||||
}
|
||||
|
||||
// consume the entry
|
||||
if err := advance(sz); err != nil {
|
||||
return offset, err
|
||||
}
|
||||
}
|
||||
|
||||
return 0, nil
|
||||
}
|
|
@ -1,7 +1,6 @@
|
|||
package verify_seriesfile
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
@ -12,31 +11,27 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func TestValidates_Valid(t *testing.T) {
|
||||
func TestVerifies_Valid(t *testing.T) {
|
||||
test := NewTest(t)
|
||||
defer test.Close()
|
||||
|
||||
test.CreateSeriesFile()
|
||||
|
||||
cmd := NewTestCommand()
|
||||
passed, err := cmd.runFile(test.Dir)
|
||||
passed, err := VerifySeriesFile(zap.NewNop(), test.Path)
|
||||
test.AssertNoError(err)
|
||||
test.Assert(passed)
|
||||
}
|
||||
|
||||
func TestValidates_Invalid(t *testing.T) {
|
||||
func TestVerifies_Invalid(t *testing.T) {
|
||||
test := NewTest(t)
|
||||
defer test.Close()
|
||||
|
||||
test.CreateSeriesFile()
|
||||
|
||||
// mutate all the files in the first partition and make sure it fails. the
|
||||
// reason we don't do every partition is to avoid quadratic time because
|
||||
// the implementation checks the partitions in order.
|
||||
|
||||
test.AssertNoError(filepath.Walk(filepath.Join(test.Dir, "00"),
|
||||
test.AssertNoError(filepath.Walk(filepath.Join(test.Path, "00"),
|
||||
func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -56,8 +51,7 @@ func TestValidates_Invalid(t *testing.T) {
|
|||
test.AssertNoError(err)
|
||||
test.AssertNoError(fh.Close())
|
||||
|
||||
cmd := NewTestCommand()
|
||||
passed, err := cmd.runFile(test.Dir)
|
||||
passed, err := VerifySeriesFile(zap.NewNop(), test.Path)
|
||||
test.AssertNoError(err)
|
||||
test.Assert(!passed)
|
||||
|
||||
|
@ -71,23 +65,71 @@ func TestValidates_Invalid(t *testing.T) {
|
|||
|
||||
type Test struct {
|
||||
*testing.T
|
||||
Dir string
|
||||
Path string
|
||||
}
|
||||
|
||||
func NewTest(t *testing.T) *Test {
|
||||
t.Helper()
|
||||
|
||||
dir, err := ioutil.TempDir("", "verify-seriesfile-")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// create a series file in the directory
|
||||
err = func() error {
|
||||
seriesFile := tsdb.NewSeriesFile(dir)
|
||||
if err := seriesFile.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
defer seriesFile.Close()
|
||||
seriesFile.EnableCompactions()
|
||||
|
||||
var names [][]byte
|
||||
var tagsSlice []models.Tags
|
||||
const numSeries = 2 *
|
||||
tsdb.SeriesFilePartitionN *
|
||||
tsdb.DefaultSeriesPartitionCompactThreshold
|
||||
|
||||
for i := 0; i < numSeries; i++ {
|
||||
names = append(names, []byte(fmt.Sprintf("series%d", i)))
|
||||
tagsSlice = append(tagsSlice, nil)
|
||||
}
|
||||
|
||||
_, err := seriesFile.CreateSeriesListIfNotExists(names, tagsSlice, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// wait for compaction to make sure we detect issues with the index
|
||||
partitions := seriesFile.Partitions()
|
||||
wait:
|
||||
for _, partition := range partitions {
|
||||
if partition.Compacting() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
goto wait
|
||||
}
|
||||
}
|
||||
|
||||
if err := seriesFile.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}()
|
||||
if err != nil {
|
||||
os.RemoveAll(dir)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return &Test{
|
||||
T: t,
|
||||
Dir: dir,
|
||||
T: t,
|
||||
Path: dir,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Test) Close() {
|
||||
os.RemoveAll(t.Dir)
|
||||
os.RemoveAll(t.Path)
|
||||
}
|
||||
|
||||
func (t *Test) AssertNoError(err error) {
|
||||
|
@ -104,36 +146,7 @@ func (t *Test) Assert(x bool) {
|
|||
}
|
||||
}
|
||||
|
||||
func (t *Test) CreateSeriesFile() {
|
||||
seriesFile := tsdb.NewSeriesFile(t.Dir)
|
||||
t.AssertNoError(seriesFile.Open())
|
||||
defer seriesFile.Close()
|
||||
|
||||
seriesFile.EnableCompactions()
|
||||
|
||||
var names [][]byte
|
||||
var tagsSlice []models.Tags
|
||||
for i := 0; i < 2*tsdb.SeriesFilePartitionN*tsdb.DefaultSeriesPartitionCompactThreshold; i++ {
|
||||
names = append(names, []byte(fmt.Sprintf("series%d", i)))
|
||||
tagsSlice = append(tagsSlice, nil)
|
||||
}
|
||||
|
||||
_, err := seriesFile.CreateSeriesListIfNotExists(names, tagsSlice, nil)
|
||||
t.AssertNoError(err)
|
||||
|
||||
// wait for compaction to make sure we detect issues with the index
|
||||
partitions := seriesFile.Partitions()
|
||||
wait:
|
||||
for _, partition := range partitions {
|
||||
if partition.Compacting() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
goto wait
|
||||
}
|
||||
}
|
||||
|
||||
t.AssertNoError(seriesFile.Close())
|
||||
}
|
||||
|
||||
// Backup makes a copy of the path for a later Restore.
|
||||
func (t *Test) Backup(path string) {
|
||||
in, err := os.Open(path)
|
||||
t.AssertNoError(err)
|
||||
|
@ -147,23 +160,7 @@ func (t *Test) Backup(path string) {
|
|||
t.AssertNoError(err)
|
||||
}
|
||||
|
||||
// Restore restores the file at the path to the time when Backup was called last.
|
||||
func (t *Test) Restore(path string) {
|
||||
t.AssertNoError(os.Rename(path+".backup", path))
|
||||
}
|
||||
|
||||
type TestCommand struct{ Command }
|
||||
|
||||
func NewTestCommand() *TestCommand {
|
||||
return &TestCommand{Command{
|
||||
Stderr: new(bytes.Buffer),
|
||||
Stdout: new(bytes.Buffer),
|
||||
}}
|
||||
}
|
||||
|
||||
func (t *TestCommand) StdoutString() string {
|
||||
return t.Stdout.(*bytes.Buffer).String()
|
||||
}
|
||||
|
||||
func (t *TestCommand) StderrString() string {
|
||||
return t.Stderr.(*bytes.Buffer).String()
|
||||
}
|
|
@ -87,10 +87,10 @@ VERIFY OPTIONS
|
|||
VERIFY-SERIESFILE OPTIONS
|
||||
--------------
|
||||
-dir <path>::
|
||||
Root storage path. Defaults to '~/.influxdb'.
|
||||
Root data storage path. Defaults to '~/.influxdb/data'.
|
||||
|
||||
-db <path>::
|
||||
Path to a specific db to check. Overrides '-dir'. Optional.
|
||||
-db <name>::
|
||||
Which db to check. Optional.
|
||||
|
||||
-file <path>::
|
||||
Path to a specific series file to check. Overrides '-dir' and '-db'. Optional.
|
||||
|
|
Loading…
Reference in New Issue