chore: fix deadlock in `influx_inspect dumptsi` (#22714)

pull/22726/head
Dane Strandboge 2021-10-20 13:08:34 -05:00 committed by GitHub
parent 6448e166a9
commit 68e2455805
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 177 additions and 24 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
errors2 "github.com/influxdata/influxdb/pkg/errors"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/index/tsi1"
)
@ -113,9 +114,9 @@ func (cmd *Command) Run(args ...string) error {
return cmd.run()
}
func (cmd *Command) run() error {
func (cmd *Command) run() (rErr error) {
sfile := tsdb.NewSeriesFile(cmd.seriesFilePath)
sfile.Logger = logger.New(os.Stderr)
sfile.Logger = logger.New(cmd.Stderr)
if err := sfile.Open(); err != nil {
return err
}
@ -126,6 +127,13 @@ func (cmd *Command) run() error {
if err != nil {
return err
}
if fs != nil {
defer errors2.Capture(&rErr, fs.Close)()
defer fs.Release()
}
if idx != nil {
defer errors2.Capture(&rErr, idx.Close)()
}
if cmd.showSeries {
if err := cmd.printSeries(sfile); err != nil {
@ -135,8 +143,6 @@ func (cmd *Command) run() error {
// If this is an ad-hoc fileset then process it and close afterward.
if fs != nil {
defer fs.Release()
defer fs.Close()
if cmd.showSeries || cmd.showMeasurements {
return cmd.printMeasurements(sfile, fs)
}
@ -144,7 +150,6 @@ func (cmd *Command) run() error {
}
// Otherwise iterate over each partition in the index.
defer idx.Close()
for i := 0; i < int(idx.PartitionN); i++ {
if err := func() error {
fs, err := idx.PartitionAt(i).RetainFileSet()
@ -169,13 +174,13 @@ func (cmd *Command) readFileSet(sfile *tsdb.SeriesFile) (*tsi1.Index, *tsi1.File
if len(cmd.paths) == 1 {
fi, err := os.Stat(cmd.paths[0])
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to get FileInfo of %q: %w", cmd.paths[0], err)
} else if fi.IsDir() {
// Verify directory is an index before opening it.
if ok, err := tsi1.IsIndexDir(cmd.paths[0]); err != nil {
return nil, nil, err
} else if !ok {
return nil, nil, fmt.Errorf("Not an index directory: %q", cmd.paths[0])
return nil, nil, fmt.Errorf("not an index directory: %q", cmd.paths[0])
}
idx := tsi1.NewIndex(sfile,
@ -184,7 +189,7 @@ func (cmd *Command) readFileSet(sfile *tsdb.SeriesFile) (*tsi1.Index, *tsi1.File
tsi1.DisableCompactions(),
)
if err := idx.Open(); err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to open TSI Index at %q: %w", idx.Path(), err)
}
return idx, nil, nil
}
@ -197,7 +202,7 @@ func (cmd *Command) readFileSet(sfile *tsdb.SeriesFile) (*tsi1.Index, *tsi1.File
case tsi1.LogFileExt:
f := tsi1.NewLogFile(sfile, path)
if err := f.Open(); err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to get TSI logfile at %q: %w", sfile.Path(), err)
}
files = append(files, f)
@ -205,7 +210,7 @@ func (cmd *Command) readFileSet(sfile *tsdb.SeriesFile) (*tsi1.Index, *tsi1.File
f := tsi1.NewIndexFile(sfile)
f.SetPath(path)
if err := f.Open(); err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to open index file at %q: %w", f.Path(), err)
}
files = append(files, f)
@ -234,7 +239,7 @@ func (cmd *Command) printSeries(sfile *tsdb.SeriesFile) error {
for {
e, err := itr.Next()
if err != nil {
return err
return fmt.Errorf("failed to get next series ID in %q: %w", sfile.Path(), err)
} else if e.SeriesID == 0 {
break
}
@ -251,7 +256,7 @@ func (cmd *Command) printSeries(sfile *tsdb.SeriesFile) error {
// Flush & write footer spacing.
if err := tw.Flush(); err != nil {
return err
return fmt.Errorf("failed to flush tabwriter: %w", err)
}
fmt.Fprint(cmd.Stdout, "\n\n")
@ -275,7 +280,7 @@ func (cmd *Command) printMeasurements(sfile *tsdb.SeriesFile, fs *tsi1.FileSet)
fmt.Fprintf(tw, "%s\t%v\n", e.Name(), deletedString(e.Deleted()))
if err := tw.Flush(); err != nil {
return err
return fmt.Errorf("failed to flush tabwriter: %w", err)
}
if err := cmd.printTagKeys(sfile, fs, e.Name()); err != nil {
@ -304,7 +309,7 @@ func (cmd *Command) printTagKeys(sfile *tsdb.SeriesFile, fs *tsi1.FileSet, name
fmt.Fprintf(tw, " %s\t%v\n", e.Key(), deletedString(e.Deleted()))
if err := tw.Flush(); err != nil {
return err
return fmt.Errorf("failed to flush tabwriter: %w", err)
}
if err := cmd.printTagValues(sfile, fs, name, e.Key()); err != nil {
@ -331,7 +336,7 @@ func (cmd *Command) printTagValues(sfile *tsdb.SeriesFile, fs *tsi1.FileSet, nam
fmt.Fprintf(tw, " %s\t%v\n", e.Value(), deletedString(e.Deleted()))
if err := tw.Flush(); err != nil {
return err
return fmt.Errorf("failed to flush tabwriter: %w", err)
}
if err := cmd.printTagValueSeries(sfile, fs, name, key, e.Value()); err != nil {
@ -352,12 +357,12 @@ func (cmd *Command) printTagValueSeries(sfile *tsdb.SeriesFile, fs *tsi1.FileSet
tw := tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0)
itr, err := fs.TagValueSeriesIDIterator(name, key, value)
if err != nil {
return err
return fmt.Errorf("failed to get series ID iterator with name %q: %w", name, err)
}
for {
e, err := itr.Next()
if err != nil {
return err
return fmt.Errorf("failed to print tag value series: %w", err)
} else if e.SeriesID == 0 {
break
}
@ -370,7 +375,7 @@ func (cmd *Command) printTagValueSeries(sfile *tsdb.SeriesFile, fs *tsi1.FileSet
fmt.Fprintf(tw, " %s%s\n", name, tags.HashKey())
if err := tw.Flush(); err != nil {
return err
return fmt.Errorf("failed to flush tabwriter: %w", err)
}
}
fmt.Fprint(cmd.Stdout, "\n")

View File

@ -0,0 +1,45 @@
package dumptsi_test
import (
"bytes"
"io/ioutil"
"os"
"testing"
"github.com/influxdata/influxdb/cmd/influx_inspect/dumptsi"
"github.com/influxdata/influxdb/pkg/tar"
"github.com/stretchr/testify/require"
)
func Test_DumpTSI_NoError(t *testing.T) {
// Create the Command object
cmd := dumptsi.NewCommand()
b := bytes.NewBufferString("")
cmd.Stdout = b
// Create the temp-dir for our un-tared files to live in
dir, err := ioutil.TempDir("", "dumptsitest-")
require.NoError(t, err)
defer os.RemoveAll(dir)
// Untar the test data
file, err := os.Open("./testdata.tar.gz")
require.NoError(t, err)
require.NoError(t, tar.Untar(dir, file))
require.NoError(t, file.Close())
// Run the test
require.NoError(t, cmd.Run(
"--series-file", dir+string(os.PathSeparator)+"_series",
dir+string(os.PathSeparator)+"L0-00000001.tsl",
))
// Validate output is as-expected
out := b.String()
require.Contains(t, out, "[LOG FILE] L0-00000001.tsl")
require.Contains(t, out, "Series:\t\t9")
require.Contains(t, out, "Measurements:\t6")
require.Contains(t, out, "Tag Keys:\t18")
require.Contains(t, out, "Tag Values:\t26")
}

Binary file not shown.

View File

@ -0,0 +1,20 @@
package errors
// Capture is a wrapper function which can be used to capture errors from closing via a defer.
// An example:
// func Example() (err error) {
// f, _ := os.Open(...)
// defer errors.Capture(&err, f.Close)()
// ...
// return
//
// Doing this will result in the error from the f.Close() call being
// put in the error via a ptr, if the error is not nil
func Capture(rErr *error, fn func() error) func() {
return func() {
err := fn()
if *rErr == nil {
*rErr = err
}
}
}

83
pkg/tar/untar.go Normal file
View File

@ -0,0 +1,83 @@
package tar
import (
"archive/tar"
"compress/gzip"
"errors"
"io"
"os"
"path/filepath"
errors2 "github.com/influxdata/influxdb/pkg/errors"
)
// Untar takes a destination path and a reader; a tar reader loops over the tarfile
// creating the file structure at 'dir' along the way, and writing any files
func Untar(dir string, r io.Reader) (rErr error) {
gzr, err := gzip.NewReader(r)
if err != nil {
return err
}
defer errors2.Capture(&rErr, gzr.Close)()
tr := tar.NewReader(gzr)
for {
header, err := tr.Next()
switch {
// if no more files are found return
case errors.Is(err, io.EOF):
return nil
// return any other error
case err != nil:
return err
// if the header is nil, just skip it (not sure how this happens)
case header == nil:
continue
}
// the target location where the dir/file should be created
target := filepath.Join(dir, header.Name)
// the following switch could also be done using fi.Mode(), not sure if there
// a benefit of using one vs. the other.
// fi := header.FileInfo()
// check the file type
switch header.Typeflag {
// if its a dir and it doesn't exist create it
case tar.TypeDir:
if _, err := os.Stat(target); err != nil {
if err := os.MkdirAll(target, 0755); err != nil {
return err
}
}
// if it's a file create it
case tar.TypeReg:
if err := untarFile(target, tr, header); err != nil {
return err
}
}
}
}
func untarFile(target string, tr *tar.Reader, header *tar.Header) (rErr error) {
f, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode))
if err != nil {
return err
}
defer errors2.Capture(&rErr, f.Close)()
// copy over contents
if _, err := io.Copy(f, tr); err != nil {
return err
}
return nil
}

View File

@ -195,6 +195,7 @@ func NewIndex(sfile *tsdb.SeriesFile, database string, options ...IndexOption) *
func (i *Index) Bytes() int {
var b int
i.mu.RLock()
defer i.mu.RUnlock()
b += 24 // mu RWMutex is 24 bytes
b += int(unsafe.Sizeof(i.partitions))
for _, p := range i.partitions {
@ -215,7 +216,6 @@ func (i *Index) Bytes() int {
b += int(unsafe.Sizeof(i.database)) + len(i.database)
b += int(unsafe.Sizeof(i.version))
b += int(unsafe.Sizeof(i.PartitionN))
i.mu.RUnlock()
return b
}
@ -886,10 +886,10 @@ func (i *Index) DropSeriesList(seriesIDs []uint64, keys [][]byte, _ bool) error
// Add sketch tombstone.
i.mu.Lock()
defer i.mu.Unlock()
for _, key := range keys {
i.sTSketch.Add(key)
}
i.mu.Unlock()
return nil
}

View File

@ -83,6 +83,7 @@ func NewIndexFile(sfile *tsdb.SeriesFile) *IndexFile {
func (f *IndexFile) bytes() int {
var b int
f.wg.Add(1)
defer f.wg.Done()
b += 16 // wg WaitGroup is 16 bytes
b += int(unsafe.Sizeof(f.data))
// Do not count f.data contents because it is mmap'd
@ -101,7 +102,6 @@ func (f *IndexFile) bytes() int {
b += 24 // mu RWMutex is 24 bytes
b += int(unsafe.Sizeof(f.compacting))
b += int(unsafe.Sizeof(f.path)) + len(f.path)
f.wg.Done()
return b
}
@ -160,8 +160,8 @@ func (f *IndexFile) Size() int64 { return int64(len(f.data)) }
// Compacting returns true if the file is being compacted.
func (f *IndexFile) Compacting() bool {
f.mu.RLock()
defer f.mu.RUnlock()
v := f.compacting
f.mu.RUnlock()
return v
}

View File

@ -239,8 +239,8 @@ func (f *LogFile) Release() { f.wg.Done() }
// Stat returns size and last modification time of the file.
func (f *LogFile) Stat() (int64, time.Time) {
f.mu.RLock()
defer f.mu.RUnlock()
size, modTime := f.size, f.modTime
f.mu.RUnlock()
return size, modTime
}
@ -257,8 +257,8 @@ func (f *LogFile) TombstoneSeriesIDSet() (*tsdb.SeriesIDSet, error) {
// Size returns the size of the file, in bytes.
func (f *LogFile) Size() int64 {
f.mu.RLock()
defer f.mu.RUnlock()
v := f.size
f.mu.RUnlock()
return v
}