When one partition in a TSI fails to open, all previously opened
partitions should be cleaned up, and remaining partitions
should not be opened
closes https://github.com/influxdata/influxdb/issues/23427
(cherry picked from commit d3db48e93d
)
closes https://github.com/influxdata/influxdb/issues/23431
pull/23463/head
parent
d0dd842149
commit
07ee889eb3
|
@ -10,15 +10,17 @@ package mmap
|
|||
import (
|
||||
"os"
|
||||
"syscall"
|
||||
|
||||
errors2 "github.com/influxdata/influxdb/pkg/errors"
|
||||
)
|
||||
|
||||
// Map memory-maps a file.
|
||||
func Map(path string, sz int64) ([]byte, error) {
|
||||
func Map(path string, sz int64) (data []byte, err error) {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
defer errors2.Capture(&err, f.Close)()
|
||||
|
||||
fi, err := f.Stat()
|
||||
if err != nil {
|
||||
|
@ -32,7 +34,7 @@ func Map(path string, sz int64) ([]byte, error) {
|
|||
sz = fi.Size()
|
||||
}
|
||||
|
||||
data, err := syscall.Mmap(int(f.Fd()), 0, int(sz), syscall.PROT_READ, syscall.MAP_SHARED)
|
||||
data, err = syscall.Mmap(int(f.Fd()), 0, int(sz), syscall.PROT_READ, syscall.MAP_SHARED)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -38,13 +38,7 @@ func (fs *FileSet) bytes() int {
|
|||
|
||||
// Close closes all the files in the file set.
|
||||
func (fs FileSet) Close() error {
|
||||
var err error
|
||||
for _, f := range fs.files {
|
||||
if e := f.Close(); e != nil && err == nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
return err
|
||||
return Files(fs.files).Close()
|
||||
}
|
||||
|
||||
// Retain adds a reference count to all files.
|
||||
|
@ -456,6 +450,16 @@ func (a Files) IDs() []int {
|
|||
return ids
|
||||
}
|
||||
|
||||
func (a Files) Close() error {
|
||||
var err error
|
||||
for _, f := range a {
|
||||
if e := f.Close(); e != nil && err == nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// fileSetSeriesIDIterator attaches a fileset to an iterator that is released on close.
|
||||
type fileSetSeriesIDIterator struct {
|
||||
once sync.Once
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/influxdata/influxdb/tsdb"
|
||||
"github.com/influxdata/influxql"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// IndexName is the name of the index.
|
||||
|
@ -252,7 +253,7 @@ func (i *Index) SeriesIDSet() *tsdb.SeriesIDSet {
|
|||
}
|
||||
|
||||
// Open opens the index.
|
||||
func (i *Index) Open() error {
|
||||
func (i *Index) Open() (rErr error) {
|
||||
i.mu.Lock()
|
||||
defer i.mu.Unlock()
|
||||
|
||||
|
@ -281,29 +282,16 @@ func (i *Index) Open() error {
|
|||
partitionN := len(i.partitions)
|
||||
n := i.availableThreads()
|
||||
|
||||
// Store results.
|
||||
errC := make(chan error, partitionN)
|
||||
|
||||
// Run fn on each partition using a fixed number of goroutines.
|
||||
var pidx uint32 // Index of maximum Partition being worked on.
|
||||
for k := 0; k < n; k++ {
|
||||
go func(k int) {
|
||||
for {
|
||||
idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
|
||||
if idx >= partitionN {
|
||||
return // No more work.
|
||||
}
|
||||
err := i.partitions[idx].Open()
|
||||
errC <- err
|
||||
}
|
||||
}(k)
|
||||
g := new(errgroup.Group)
|
||||
g.SetLimit(n)
|
||||
for idx := 0; idx < partitionN; idx++ {
|
||||
g.Go(i.partitions[idx].Open)
|
||||
}
|
||||
|
||||
// Check for error
|
||||
for i := 0; i < partitionN; i++ {
|
||||
if err := <-errC; err != nil {
|
||||
return err
|
||||
}
|
||||
err := g.Wait()
|
||||
defer i.cleanUpFail(&rErr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Refresh cached sketches.
|
||||
|
@ -319,6 +307,18 @@ func (i *Index) Open() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (i *Index) cleanUpFail(err *error) {
|
||||
if nil != *err {
|
||||
for _, p := range i.partitions {
|
||||
if (p != nil) && p.IsOpen() {
|
||||
if e := p.Close(); e != nil {
|
||||
i.logger.Warn("Failed to clean up partition")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Compact requests a compaction of partitions.
|
||||
func (i *Index) Compact() {
|
||||
i.mu.Lock()
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"regexp"
|
||||
|
@ -14,8 +15,10 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/pkg/testing/assert"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"github.com/influxdata/influxdb/tsdb/index/tsi1"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Bloom filter settings used in tests.
|
||||
|
@ -208,6 +211,28 @@ func TestIndex_DropMeasurement(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestIndex_OpenFail(t *testing.T) {
|
||||
idx := NewDefaultIndex()
|
||||
require.NoError(t, idx.Open())
|
||||
idx.Index.Close()
|
||||
// mess up the index:
|
||||
tslPath := path.Join(idx.Index.Path(), "3", "L0-00000001.tsl")
|
||||
tslFile, err := os.OpenFile(tslPath, os.O_RDWR, 0666)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, tslFile.Truncate(0))
|
||||
// write poisonous TSL file - first byte doesn't matter, remaining bytes are an invalid uvarint
|
||||
_, err = tslFile.Write([]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, tslFile.Close())
|
||||
idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", tsi1.WithPath(idx.Index.Path()))
|
||||
require.EqualError(t, idx.Index.Open(), "parsing binary-encoded uint64 value failed; binary.Uvarint() returned -11")
|
||||
// ensure each partition is closed:
|
||||
for i := 0; i < int(idx.Index.PartitionN); i++ {
|
||||
assert.Equal(t, idx.Index.PartitionAt(i).FileN(), 0)
|
||||
}
|
||||
require.NoError(t, idx.Close())
|
||||
}
|
||||
|
||||
func TestIndex_Open(t *testing.T) {
|
||||
// Opening a fresh index should set the MANIFEST version to current version.
|
||||
idx := NewDefaultIndex()
|
||||
|
|
|
@ -89,11 +89,11 @@ type Partition struct {
|
|||
// NewPartition returns a new instance of Partition.
|
||||
func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition {
|
||||
return &Partition{
|
||||
closing: make(chan struct{}),
|
||||
path: path,
|
||||
sfile: sfile,
|
||||
seriesIDSet: tsdb.NewSeriesIDSet(),
|
||||
|
||||
closing: make(chan struct{}),
|
||||
path: path,
|
||||
sfile: sfile,
|
||||
seriesIDSet: tsdb.NewSeriesIDSet(),
|
||||
fileSet: &FileSet{},
|
||||
MaxLogFileSize: tsdb.DefaultMaxIndexLogFileSize,
|
||||
MaxLogFileAge: tsdb.DefaultCompactFullWriteColdDuration,
|
||||
|
||||
|
@ -144,7 +144,7 @@ func (p *Partition) bytes() int {
|
|||
var ErrIncompatibleVersion = errors.New("incompatible tsi1 index MANIFEST")
|
||||
|
||||
// Open opens the partition.
|
||||
func (p *Partition) Open() error {
|
||||
func (p *Partition) Open() (rErr error) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
|
@ -190,6 +190,12 @@ func (p *Partition) Open() error {
|
|||
|
||||
// Open each file in the manifest.
|
||||
var files []File
|
||||
defer func() {
|
||||
if rErr != nil {
|
||||
Files(files).Close()
|
||||
}
|
||||
}()
|
||||
|
||||
for _, filename := range m.Files {
|
||||
switch filepath.Ext(filename) {
|
||||
case LogFileExt:
|
||||
|
@ -230,7 +236,7 @@ func (p *Partition) Open() error {
|
|||
}
|
||||
}
|
||||
|
||||
// Build series existance set.
|
||||
// Build series existence set.
|
||||
if err := p.buildSeriesSet(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -244,6 +250,10 @@ func (p *Partition) Open() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (p *Partition) IsOpen() bool {
|
||||
return p.opened
|
||||
}
|
||||
|
||||
// openLogFile opens a log file and appends it to the index.
|
||||
func (p *Partition) openLogFile(path string) (*LogFile, error) {
|
||||
f := NewLogFile(p.sfile, path)
|
||||
|
|
Loading…
Reference in New Issue