diff --git a/go.mod b/go.mod index 8e7fd17fb2..530b881f7c 100644 --- a/go.mod +++ b/go.mod @@ -120,6 +120,7 @@ require ( github.com/yudai/gojsondiff v1.0.0 github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect github.com/yudai/pp v2.0.1+incompatible // indirect + go.uber.org/multierr v1.1.0 go.uber.org/zap v1.9.1 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 golang.org/x/net v0.0.0-20190311183353-d8887717615a diff --git a/tsdb/series_file.go b/tsdb/series_file.go index 6a1352e21e..d7cd31f4f1 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -19,6 +19,7 @@ import ( "github.com/influxdata/influxdb/pkg/lifecycle" "github.com/influxdata/influxdb/pkg/rhh" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -146,7 +147,11 @@ func (f *SeriesFile) Open(ctx context.Context) error { p.tracker.enabled = f.metricsEnabled if err := p.Open(); err != nil { - f.Close() + f.Logger.Error("Unable to open series file", + zap.String("path", f.path), + zap.Int("partition", p.ID()), + zap.Error(err)) + f.closeNoLock() return err } f.partitions = append(f.partitions, p) @@ -158,21 +163,22 @@ func (f *SeriesFile) Open(ctx context.Context) error { return nil } -// Close unmaps the data file. -func (f *SeriesFile) Close() (err error) { - f.mu.Lock() - defer f.mu.Unlock() - +func (f *SeriesFile) closeNoLock() (err error) { // Close the resource and wait for any outstanding references. f.res.Close() + var errs []error for _, p := range f.partitions { - if e := p.Close(); e != nil && err == nil { - err = e - } + errs = append(errs, p.Close()) } + return multierr.Combine(errs...) +} - return err +// Close unmaps the data file. +func (f *SeriesFile) Close() error { + f.mu.Lock() + defer f.mu.Unlock() + return f.closeNoLock() } // Path returns the path to the file. diff --git a/tsdb/series_file_test.go b/tsdb/series_file_test.go index 678ec15e86..6703d83651 100644 --- a/tsdb/series_file_test.go +++ b/tsdb/series_file_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io/ioutil" "os" + "path" "testing" "github.com/influxdata/influxdb/logger" @@ -50,6 +51,18 @@ func TestParseSeriesKeyInto(t *testing.T) { } } +// Ensure that broken series files are closed +func TestSeriesFile_Open_WhenFileCorrupt_ShouldReturnErr(t *testing.T) { + f := NewBrokenSeriesFile([]byte{0, 0, 0, 0, 0}) + defer f.Close() + f.Logger = logger.New(os.Stdout) + + err := f.Open(context.Background()) + if err == nil { + t.Fatalf("should report error") + } +} + // Ensure series file contains the correct set of series. func TestSeriesFile_Series(t *testing.T) { sfile := MustOpenSeriesFile() @@ -240,6 +253,27 @@ func NewSeriesFile() *SeriesFile { return &SeriesFile{SeriesFile: tsdb.NewSeriesFile(dir)} } +func NewBrokenSeriesFile(content []byte) *SeriesFile { + sFile := NewSeriesFile() + fPath := sFile.Path() + if err := sFile.Open(context.Background()); err != nil { + panic(err) + } + if err := sFile.SeriesFile.Close(); err != nil { + panic(err) + } + + segPath := path.Join(fPath, "00", "0000") + if _, err := os.Stat(segPath); os.IsNotExist(err) { + panic(err) + } + err := ioutil.WriteFile(segPath, content, 0777) + if err != nil { + panic(err) + } + return sFile +} + // MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error. func MustOpenSeriesFile() *SeriesFile { f := NewSeriesFile()