backport: Fix open/close race in SeriesFile (#13837)
parent
033dbaa136
commit
aa5c77409d
1
go.mod
1
go.mod
|
@ -120,6 +120,7 @@ require (
|
|||
github.com/yudai/gojsondiff v1.0.0
|
||||
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect
|
||||
github.com/yudai/pp v2.0.1+incompatible // indirect
|
||||
go.uber.org/multierr v1.1.0
|
||||
go.uber.org/zap v1.9.1
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/influxdata/influxdb/pkg/lifecycle"
|
||||
"github.com/influxdata/influxdb/pkg/rhh"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/multierr"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
@ -146,7 +147,11 @@ func (f *SeriesFile) Open(ctx context.Context) error {
|
|||
p.tracker.enabled = f.metricsEnabled
|
||||
|
||||
if err := p.Open(); err != nil {
|
||||
f.Close()
|
||||
f.Logger.Error("Unable to open series file",
|
||||
zap.String("path", f.path),
|
||||
zap.Int("partition", p.ID()),
|
||||
zap.Error(err))
|
||||
f.closeNoLock()
|
||||
return err
|
||||
}
|
||||
f.partitions = append(f.partitions, p)
|
||||
|
@ -158,21 +163,22 @@ func (f *SeriesFile) Open(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Close unmaps the data file.
|
||||
func (f *SeriesFile) Close() (err error) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
func (f *SeriesFile) closeNoLock() (err error) {
|
||||
// Close the resource and wait for any outstanding references.
|
||||
f.res.Close()
|
||||
|
||||
var errs []error
|
||||
for _, p := range f.partitions {
|
||||
if e := p.Close(); e != nil && err == nil {
|
||||
err = e
|
||||
}
|
||||
errs = append(errs, p.Close())
|
||||
}
|
||||
return multierr.Combine(errs...)
|
||||
}
|
||||
|
||||
return err
|
||||
// Close unmaps the data file.
|
||||
func (f *SeriesFile) Close() error {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
return f.closeNoLock()
|
||||
}
|
||||
|
||||
// Path returns the path to the file.
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/logger"
|
||||
|
@ -50,6 +51,18 @@ func TestParseSeriesKeyInto(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure that broken series files are closed
|
||||
func TestSeriesFile_Open_WhenFileCorrupt_ShouldReturnErr(t *testing.T) {
|
||||
f := NewBrokenSeriesFile([]byte{0, 0, 0, 0, 0})
|
||||
defer f.Close()
|
||||
f.Logger = logger.New(os.Stdout)
|
||||
|
||||
err := f.Open(context.Background())
|
||||
if err == nil {
|
||||
t.Fatalf("should report error")
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure series file contains the correct set of series.
|
||||
func TestSeriesFile_Series(t *testing.T) {
|
||||
sfile := MustOpenSeriesFile()
|
||||
|
@ -240,6 +253,27 @@ func NewSeriesFile() *SeriesFile {
|
|||
return &SeriesFile{SeriesFile: tsdb.NewSeriesFile(dir)}
|
||||
}
|
||||
|
||||
func NewBrokenSeriesFile(content []byte) *SeriesFile {
|
||||
sFile := NewSeriesFile()
|
||||
fPath := sFile.Path()
|
||||
if err := sFile.Open(context.Background()); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := sFile.SeriesFile.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
segPath := path.Join(fPath, "00", "0000")
|
||||
if _, err := os.Stat(segPath); os.IsNotExist(err) {
|
||||
panic(err)
|
||||
}
|
||||
err := ioutil.WriteFile(segPath, content, 0777)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return sFile
|
||||
}
|
||||
|
||||
// MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error.
|
||||
func MustOpenSeriesFile() *SeriesFile {
|
||||
f := NewSeriesFile()
|
||||
|
|
Loading…
Reference in New Issue