Allow iterators to return if shard is closing

pull/9150/head
Edd Robinson 2017-12-15 00:42:40 +00:00
parent 59afd8cc90
commit 289d1f8d44
8 changed files with 157 additions and 138 deletions

View File

@ -136,7 +136,10 @@ func (cmd *Command) run() error {
defer idx.Close()
for i := 0; i < int(idx.PartitionN); i++ {
if err := func() error {
fs := idx.PartitionAt(i).RetainFileSet()
fs, err := idx.PartitionAt(i).RetainFileSet()
if err != nil {
return err
}
defer fs.Release()
return cmd.printFileSet(sfile, fs)
}(); err != nil {

View File

@ -103,7 +103,6 @@ func TestConcurrentServer_TagValues(t *testing.T) {
}
func TestConcurrentServer_ShowMeasurements(t *testing.T) {
t.Skip("TODO")
t.Parallel()
if testing.Short() {

View File

@ -1368,9 +1368,12 @@ func (is IndexSet) measurementAuthorizedSeries(auth query.Authorizer, name []byt
return true
}
// TODO(edd) there isn't a need to return an error when instantiating the iterator.
sitr, _ := is.MeasurementSeriesIDIterator(name)
sitr, err := is.MeasurementSeriesIDIterator(name)
if err != nil || sitr == nil {
return false
}
defer sitr.Close()
for {
series, err := sitr.Next()
if err != nil {

View File

@ -472,7 +472,7 @@ func (itr *fileSetSeriesIDIterator) Next() (tsdb.SeriesIDElem, error) {
func (itr *fileSetSeriesIDIterator) Close() error {
itr.once.Do(func() { itr.fs.Release() })
return nil
return itr.itr.Close()
}
// fileSetMeasurementIterator attaches a fileset to an iterator that is released on close.
@ -492,7 +492,7 @@ func (itr *fileSetMeasurementIterator) Next() ([]byte, error) {
func (itr *fileSetMeasurementIterator) Close() error {
itr.once.Do(func() { itr.fs.Release() })
return nil
return itr.itr.Close()
}
// fileSetTagKeyIterator attaches a fileset to an iterator that is released on close.
@ -512,7 +512,7 @@ func (itr *fileSetTagKeyIterator) Next() ([]byte, error) {
func (itr *fileSetTagKeyIterator) Close() error {
itr.once.Do(func() { itr.fs.Release() })
return nil
return itr.itr.Close()
}
// fileSetTagValueIterator attaches a fileset to an iterator that is released on close.
@ -532,5 +532,5 @@ func (itr *fileSetTagValueIterator) Next() ([]byte, error) {
func (itr *fileSetTagValueIterator) Close() error {
itr.once.Do(func() { itr.fs.Release() })
return nil
return itr.itr.Close()
}

View File

@ -25,7 +25,10 @@ func TestFileSet_SeriesIDIterator(t *testing.T) {
// Verify initial set of series.
idx.Run(t, func(t *testing.T) {
fs := idx.PartitionAt(0).RetainFileSet()
fs, err := idx.PartitionAt(0).RetainFileSet()
if err != nil {
t.Fatal(err)
}
defer fs.Release()
itr := fs.SeriesFile().SeriesIDIterator()
@ -66,7 +69,10 @@ func TestFileSet_SeriesIDIterator(t *testing.T) {
// Verify additional series.
idx.Run(t, func(t *testing.T) {
fs := idx.PartitionAt(0).RetainFileSet()
fs, err := idx.PartitionAt(0).RetainFileSet()
if err != nil {
t.Fatal(err)
}
defer fs.Release()
itr := fs.SeriesFile().SeriesIDIterator()
@ -128,7 +134,10 @@ func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) {
// Verify initial set of series.
idx.Run(t, func(t *testing.T) {
fs := idx.PartitionAt(0).RetainFileSet()
fs, err := idx.PartitionAt(0).RetainFileSet()
if err != nil {
t.Fatal(err)
}
defer fs.Release()
itr := fs.MeasurementSeriesIDIterator([]byte("cpu"))
@ -163,7 +172,10 @@ func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) {
// Verify additional series.
idx.Run(t, func(t *testing.T) {
fs := idx.PartitionAt(0).RetainFileSet()
fs, err := idx.PartitionAt(0).RetainFileSet()
if err != nil {
t.Fatal(err)
}
defer fs.Release()
itr := fs.MeasurementSeriesIDIterator([]byte("cpu"))
@ -212,7 +224,10 @@ func TestFileSet_MeasurementIterator(t *testing.T) {
// Verify initial set of series.
idx.Run(t, func(t *testing.T) {
fs := idx.PartitionAt(0).RetainFileSet()
fs, err := idx.PartitionAt(0).RetainFileSet()
if err != nil {
t.Fatal(err)
}
defer fs.Release()
itr := fs.MeasurementIterator()
@ -239,7 +254,10 @@ func TestFileSet_MeasurementIterator(t *testing.T) {
// Verify additional series.
idx.Run(t, func(t *testing.T) {
fs := idx.PartitionAt(0).RetainFileSet()
fs, err := idx.PartitionAt(0).RetainFileSet()
if err != nil {
t.Fatal(err)
}
defer fs.Release()
itr := fs.MeasurementIterator()
@ -278,7 +296,10 @@ func TestFileSet_TagKeyIterator(t *testing.T) {
// Verify initial set of series.
idx.Run(t, func(t *testing.T) {
fs := idx.PartitionAt(0).RetainFileSet()
fs, err := idx.PartitionAt(0).RetainFileSet()
if err != nil {
t.Fatal(err)
}
defer fs.Release()
itr := fs.TagKeyIterator([]byte("cpu"))
@ -305,7 +326,10 @@ func TestFileSet_TagKeyIterator(t *testing.T) {
// Verify additional series.
idx.Run(t, func(t *testing.T) {
fs := idx.PartitionAt(0).RetainFileSet()
fs, err := idx.PartitionAt(0).RetainFileSet()
if err != nil {
t.Fatal(err)
}
defer fs.Release()
itr := fs.TagKeyIterator([]byte("cpu"))
@ -324,66 +348,3 @@ func TestFileSet_TagKeyIterator(t *testing.T) {
}
})
}
/*
var (
byteSliceResult [][]byte
tagsSliceResult []models.Tags
)
func BenchmarkFileset_FilterNamesTags(b *testing.B) {
sfile := MustOpenSeriesFile()
defer sfile.Close()
idx := MustOpenIndex(sfile.SeriesFile, 1)
defer idx.Close()
allNames := make([][]byte, 0, 2000*1000)
allTags := make([]models.Tags, 0, 2000*1000)
for i := 0; i < 2000; i++ {
for j := 0; j < 1000; j++ {
name := []byte(fmt.Sprintf("measurement-%d", i))
tags := models.NewTags(map[string]string{"host": fmt.Sprintf("server-%d", j)})
allNames = append(allNames, name)
allTags = append(allTags, tags)
}
}
if err := idx.CreateSeriesListIfNotExists(nil, allNames, allTags); err != nil {
b.Fatal(err)
}
// idx.CheckFastCompaction()
fs := idx.PartitionAt(0).RetainFileSet()
defer fs.Release()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
names := [][]byte{
[]byte("foo"),
[]byte("measurement-222"), // filtered
[]byte("measurement-222"), // kept (tags won't match)
[]byte("measurements-1"),
[]byte("measurement-900"), // filtered
[]byte("measurement-44444"),
[]byte("bar"),
}
tags := []models.Tags{
nil,
models.NewTags(map[string]string{"host": "server-297"}), // filtered
models.NewTags(map[string]string{"host": "wrong"}),
nil,
models.NewTags(map[string]string{"host": "server-1026"}), // filtered
models.NewTags(map[string]string{"host": "server-23"}), // kept (measurement won't match)
models.NewTags(map[string]string{"host": "zoo"}),
}
b.StartTimer()
byteSliceResult, tagsSliceResult = fs.FilterNamesTags(names, tags)
}
}
*/

View File

@ -766,7 +766,11 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s
// DiskSizeBytes returns the size of the index on disk.
func (i *Index) DiskSizeBytes() int64 {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
i.logger.Warn("Index is closing down")
return 0
}
defer fs.Release()
var manifestSize int64
@ -810,16 +814,20 @@ func (i *Index) SnapshotTo(path string) error {
// RetainFileSet returns the set of all files across all partitions.
// This is only needed when all files need to be retained for an operation.
func (i *Index) RetainFileSet() *FileSet {
func (i *Index) RetainFileSet() (*FileSet, error) {
i.mu.RLock()
defer i.mu.RUnlock()
fs, _ := NewFileSet(i.database, nil, i.sfile, nil)
for _, p := range i.partitions {
pfs := p.RetainFileSet()
pfs, err := p.RetainFileSet()
if err != nil {
fs.Close()
return nil, err
}
fs.files = append(fs.files, pfs.files...)
}
return fs
return fs, nil
}
func (i *Index) SetFieldName(measurement []byte, name string) {}

View File

@ -188,7 +188,10 @@ func TestIndex_DropMeasurement(t *testing.T) {
}
// Obtain file set to perform lower level checks.
fs := idx.PartitionAt(0).RetainFileSet()
fs, err := idx.PartitionAt(0).RetainFileSet()
if err != nil {
t.Fatal(err)
}
defer fs.Release()
// Verify tags & values are gone.

View File

@ -55,7 +55,7 @@ type Partition struct {
// Close management.
once sync.Once
closing chan struct{}
closing chan struct{} // closing is used to inform iterators the partition is closing.
wg sync.WaitGroup
// Fieldset shared with engine.
@ -88,9 +88,8 @@ type Partition struct {
func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition {
return &Partition{
closing: make(chan struct{}),
path: path,
sfile: sfile,
path: path,
sfile: sfile,
// Default compaction thresholds.
MaxLogFileSize: DefaultMaxLogFileSize,
@ -110,6 +109,8 @@ func (i *Partition) Open() error {
i.mu.Lock()
defer i.mu.Unlock()
i.closing = make(chan struct{})
if i.opened {
return errors.New("index partition already open")
}
@ -258,13 +259,14 @@ func (i *Partition) Wait() {
// Close closes the index.
func (i *Partition) Close() error {
// Wait for goroutines to finish outstanding compactions.
i.once.Do(func() { close(i.closing) })
i.wg.Wait()
// Lock index and close remaining
i.mu.Lock()
defer i.mu.Unlock()
i.once.Do(func() { close(i.closing) })
// Close log files.
for _, f := range i.fileSet.files {
f.Close()
@ -274,6 +276,17 @@ func (i *Partition) Close() error {
return nil
}
// closing returns true if the partition is currently closing. It does not require
// a lock so will always return to callers.
// func (i *Partition) closing() bool {
// select {
// case <-i.closing:
// return true
// default:
// return false
// }
// }
// Path returns the path to the partition.
func (i *Partition) Path() string { return i.path }
@ -334,11 +347,15 @@ func (i *Partition) FieldSet() *tsdb.MeasurementFieldSet {
}
// RetainFileSet returns the current fileset and adds a reference count.
func (i *Partition) RetainFileSet() *FileSet {
i.mu.RLock()
fs := i.retainFileSet()
i.mu.RUnlock()
return fs
func (i *Partition) RetainFileSet() (*FileSet, error) {
select {
case <-i.closing:
return nil, errors.New("index is closing")
default:
i.mu.RLock()
defer i.mu.RUnlock()
return i.retainFileSet(), nil
}
}
func (i *Partition) retainFileSet() *FileSet {
@ -374,7 +391,10 @@ func (i *Partition) prependActiveLogFile() error {
// ForEachMeasurementName iterates over all measurement names in the index.
func (i *Partition) ForEachMeasurementName(fn func(name []byte) error) error {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return err
}
defer fs.Release()
itr := fs.MeasurementIterator()
@ -393,7 +413,10 @@ func (i *Partition) ForEachMeasurementName(fn func(name []byte) error) error {
// MeasurementIterator returns an iterator over all measurement names.
func (i *Partition) MeasurementIterator() (tsdb.MeasurementIterator, error) {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return nil, err
}
itr := fs.MeasurementIterator()
if itr == nil {
fs.Release()
@ -404,14 +427,20 @@ func (i *Partition) MeasurementIterator() (tsdb.MeasurementIterator, error) {
// MeasurementExists returns true if a measurement exists.
func (i *Partition) MeasurementExists(name []byte) (bool, error) {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return false, err
}
defer fs.Release()
m := fs.Measurement(name)
return m != nil && !m.Deleted(), nil
}
func (i *Partition) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return nil, err
}
defer fs.Release()
itr := fs.MeasurementIterator()
@ -430,13 +459,19 @@ func (i *Partition) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
}
func (i *Partition) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator, error) {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return nil, err
}
return newFileSetSeriesIDIterator(fs, fs.MeasurementSeriesIDIterator(name)), nil
}
// DropMeasurement deletes a measurement from the index.
func (i *Partition) DropMeasurement(name []byte) error {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return err
}
defer fs.Release()
// Delete all keys and values.
@ -514,7 +549,10 @@ func (i *Partition) DropMeasurement(name []byte) error {
// bulk.
func (i *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags) error {
// Maintain reference count on files in file set.
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return err
}
defer fs.Release()
// Ensure fileset cannot change during insert.
@ -559,28 +597,41 @@ func (i *Partition) DropSeries(key []byte, ts int64) error {
// MeasurementsSketches returns the two sketches for the index by merging all
// instances of the type sketch types in all the index files.
func (i *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return nil, nil, err
}
defer fs.Release()
return fs.MeasurementsSketches()
}
// HasTagKey returns true if tag key exists.
func (i *Partition) HasTagKey(name, key []byte) (bool, error) {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return false, err
}
defer fs.Release()
return fs.HasTagKey(name, key), nil
}
// HasTagValue returns true if tag value exists.
func (i *Partition) HasTagValue(name, key, value []byte) (bool, error) {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return false, err
}
defer fs.Release()
return fs.HasTagValue(name, key, value), nil
}
// TagKeyIterator returns an iterator for all keys across a single measurement.
func (i *Partition) TagKeyIterator(name []byte) tsdb.TagKeyIterator {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return nil // TODO(edd): this should probably return an error.
}
itr := fs.TagKeyIterator(name)
if itr == nil {
fs.Release()
@ -591,7 +642,11 @@ func (i *Partition) TagKeyIterator(name []byte) tsdb.TagKeyIterator {
// TagValueIterator returns an iterator for all values across a single key.
func (i *Partition) TagValueIterator(name, key []byte) tsdb.TagValueIterator {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return nil // TODO(edd): this should probably return an error.
}
itr := fs.TagValueIterator(name, key)
if itr == nil {
fs.Release()
@ -602,7 +657,11 @@ func (i *Partition) TagValueIterator(name, key []byte) tsdb.TagValueIterator {
// TagKeySeriesIDIterator returns a series iterator for all values across a single key.
func (i *Partition) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return nil // TODO(edd): this should probably return an error.
}
itr := fs.TagKeySeriesIDIterator(name, key)
if itr == nil {
fs.Release()
@ -613,7 +672,11 @@ func (i *Partition) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterat
// TagValueSeriesIDIterator returns a series iterator for a single key value.
func (i *Partition) TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesIDIterator {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return nil // TODO(edd): this should probably return an error.
}
itr := fs.TagValueSeriesIDIterator(name, key, value)
if itr == nil {
fs.Release()
@ -624,14 +687,21 @@ func (i *Partition) TagValueSeriesIDIterator(name, key, value []byte) tsdb.Serie
// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression.
func (i *Partition) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return nil, err
}
defer fs.Release()
return fs.MeasurementTagKeysByExpr(name, expr)
}
// ForEachMeasurementTagKey iterates over all tag keys in a measurement.
func (i *Partition) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return err
}
defer fs.Release()
itr := fs.TagKeyIterator(name)
@ -654,34 +724,6 @@ func (i *Partition) TagKeyCardinality(name, key []byte) int {
return 0
}
/*
func (i *Partition) MeasurementSeriesKeysByExprIterator(name []byte, condition influxql.Expr) (tsdb.SeriesIDIterator, error) {
fs := i.RetainFileSet()
defer fs.Release()
itr, err := fs.MeasurementSeriesByExprIterator(name, condition, i.fieldset)
if err != nil {
return nil, err
} else if itr == nil {
return nil, nil
}
return itr, err
}
*/
/*
// MeasurementSeriesKeysByExpr returns a list of series keys matching expr.
func (i *Partition) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) {
fs := i.RetainFileSet()
defer fs.Release()
keys, err := fs.MeasurementSeriesKeysByExpr(name, expr, i.fieldset)
// Clone byte slices since they will be used after the fileset is released.
return bytesutil.CloneSlice(keys), err
}
*/
// SnapshotTo creates hard links to the file set into path.
func (i *Partition) SnapshotTo(path string) error {
i.mu.Lock()