Merge pull request #10057 from influxdata/jgm-revert-deadlock-fix

Revert "Resolve deadlock"
pull/10062/head
Jacob Marble 2018-07-10 09:27:15 -07:00 committed by GitHub
commit 375eddebb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 77 additions and 76 deletions

View File

@ -563,69 +563,68 @@ func (p *Partition) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDItera
// DropMeasurement deletes a measurement from the index. DropMeasurement does
// not remove any series from the index directly.
func (p *Partition) DropMeasurement(name []byte) error {
err := func() error {
fs, err := p.RetainFileSet()
if err != nil {
return err
}
fs.Release()
fs, err := p.RetainFileSet()
if err != nil {
return err
}
defer fs.Release()
// Delete all keys and values.
if kitr := fs.TagKeyIterator(name); kitr != nil {
for k := kitr.Next(); k != nil; k = kitr.Next() {
// Delete key if not already deleted.
if !k.Deleted() {
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteTagKey(name, k.Key())
}(); err != nil {
return err
}
// Delete all keys and values.
if kitr := fs.TagKeyIterator(name); kitr != nil {
for k := kitr.Next(); k != nil; k = kitr.Next() {
// Delete key if not already deleted.
if !k.Deleted() {
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteTagKey(name, k.Key())
}(); err != nil {
return err
}
}
// Delete each value in key.
if vitr := k.TagValueIterator(); vitr != nil {
for v := vitr.Next(); v != nil; v = vitr.Next() {
if !v.Deleted() {
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteTagValue(name, k.Key(), v.Value())
}(); err != nil {
return err
}
// Delete each value in key.
if vitr := k.TagValueIterator(); vitr != nil {
for v := vitr.Next(); v != nil; v = vitr.Next() {
if !v.Deleted() {
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteTagValue(name, k.Key(), v.Value())
}(); err != nil {
return err
}
}
}
}
}
}
// Delete all series.
if itr := fs.MeasurementSeriesIDIterator(name); itr != nil {
defer itr.Close()
for {
elem, err := itr.Next()
if err != nil {
return err
} else if elem.SeriesID == 0 {
break
}
if err := p.activeLogFile.DeleteSeriesID(elem.SeriesID); err != nil {
return err
}
// Delete all series.
if itr := fs.MeasurementSeriesIDIterator(name); itr != nil {
defer itr.Close()
for {
elem, err := itr.Next()
if err != nil {
return err
} else if elem.SeriesID == 0 {
break
}
if err = itr.Close(); err != nil {
if err := p.activeLogFile.DeleteSeriesID(elem.SeriesID); err != nil {
return err
}
}
if err = itr.Close(); err != nil {
return err
}
}
// Mark measurement as deleted.
// Mark measurement as deleted.
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteMeasurement(name)
}()
if err != nil {
}(); err != nil {
return err
}
@ -647,23 +646,21 @@ func (p *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []mode
return fmt.Errorf("uneven batch, partition %s sent %d names and %d tags", p.id, len(names), len(tagsSlice))
}
err := func() error {
// Maintain reference count on files in file set.
fs, err := p.RetainFileSet()
if err != nil {
return err
}
defer fs.Release()
// Ensure fileset cannot change during insert.
p.mu.RLock()
defer p.mu.RUnlock()
// Insert series into log file.
return p.activeLogFile.AddSeriesList(p.seriesIDSet, names, tagsSlice)
}()
// Maintain reference count on files in file set.
fs, err := p.RetainFileSet()
if err != nil {
return err
}
defer fs.Release()
// Ensure fileset cannot change during insert.
p.mu.RLock()
// Insert series into log file.
if err := p.activeLogFile.AddSeriesList(p.seriesIDSet, names, tagsSlice); err != nil {
p.mu.RUnlock()
return err
}
p.mu.RUnlock()
return p.CheckLogFile()
}
@ -1032,8 +1029,6 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
func (p *Partition) Rebuild() {}
// CheckLogFile executes log file compaction and TSI compaction, only if the active log file size exceeds p.MaxLogFileSize.
// Do not call this method from a goroutine that has retained the partition FileSet.
func (p *Partition) CheckLogFile() error {
// Check log file size under read lock.
if size := func() int64 {
@ -1067,11 +1062,8 @@ func (p *Partition) checkLogFile() error {
p.wg.Add(1)
go func() {
defer p.wg.Done()
p.mu.Lock()
defer p.mu.Unlock()
p.compactLogFile(logFile)
p.compact()
p.Compact() // check for new compactions
}()
return nil
@ -1083,11 +1075,11 @@ func (p *Partition) checkLogFile() error {
func (p *Partition) compactLogFile(logFile *LogFile) {
if p.isClosing() {
return
} else if !p.compactionsEnabled() {
return
}
p.mu.Lock()
interrupt := p.compactionInterrupt
p.mu.Unlock()
start := time.Now()
@ -1130,19 +1122,28 @@ func (p *Partition) compactLogFile(logFile *LogFile) {
return
}
// Replace previous log file with index file.
p.fileSet = p.fileSet.MustReplace([]File{logFile}, file)
// Obtain lock to swap in index file and write manifest.
if err := func() error {
p.mu.Lock()
defer p.mu.Unlock()
// Write new manifest.
manifestSize, err := p.Manifest().Write()
if err != nil {
// TODO: Close index if write fails.
// Replace previous log file with index file.
p.fileSet = p.fileSet.MustReplace([]File{logFile}, file)
// Write new manifest.
manifestSize, err := p.Manifest().Write()
if err != nil {
// TODO: Close index if write fails.
return err
}
p.manifestSize = manifestSize
return nil
}(); err != nil {
log.Error("Cannot update manifest", zap.Error(err))
return
}
p.manifestSize = manifestSize
elapsed := time.Since(start)
log.Info("Log file compacted",
logger.DurationLiteral("elapsed", elapsed),