diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go index 5459cbd118..47399690d6 100644 --- a/tsdb/index/tsi1/partition.go +++ b/tsdb/index/tsi1/partition.go @@ -103,35 +103,35 @@ func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition { } // bytes estimates the memory footprint of this Partition, in bytes. -func (i *Partition) bytes() int { +func (p *Partition) bytes() int { var b int b += 24 // mu RWMutex is 24 bytes - b += int(unsafe.Sizeof(i.opened)) + b += int(unsafe.Sizeof(p.opened)) // Do not count SeriesFile because it belongs to the code that constructed this Partition. - b += int(unsafe.Sizeof(i.activeLogFile)) + i.activeLogFile.bytes() - b += int(unsafe.Sizeof(i.fileSet)) + i.fileSet.bytes() - b += int(unsafe.Sizeof(i.seq)) - b += int(unsafe.Sizeof(i.seriesIDSet)) + i.seriesIDSet.Bytes() - b += int(unsafe.Sizeof(i.levels)) - for _, level := range i.levels { + b += int(unsafe.Sizeof(p.activeLogFile)) + p.activeLogFile.bytes() + b += int(unsafe.Sizeof(p.fileSet)) + p.fileSet.bytes() + b += int(unsafe.Sizeof(p.seq)) + b += int(unsafe.Sizeof(p.seriesIDSet)) + p.seriesIDSet.Bytes() + b += int(unsafe.Sizeof(p.levels)) + for _, level := range p.levels { b += int(unsafe.Sizeof(level)) } - b += int(unsafe.Sizeof(i.levelCompacting)) - for _, levelCompacting := range i.levelCompacting { + b += int(unsafe.Sizeof(p.levelCompacting)) + for _, levelCompacting := range p.levelCompacting { b += int(unsafe.Sizeof(levelCompacting)) } b += 12 // once sync.Once is 12 bytes - b += int(unsafe.Sizeof(i.closing)) + b += int(unsafe.Sizeof(p.closing)) b += 16 // wg sync.WaitGroup is 16 bytes - b += int(unsafe.Sizeof(i.fieldset)) + i.fieldset.Bytes() - b += int(unsafe.Sizeof(i.path)) + len(i.path) - b += int(unsafe.Sizeof(i.id)) + len(i.id) - b += int(unsafe.Sizeof(i.MaxLogFileSize)) - b += int(unsafe.Sizeof(i.compactionInterrupt)) - b += int(unsafe.Sizeof(i.compactionsDisabled)) - b += int(unsafe.Sizeof(i.logger)) - b += int(unsafe.Sizeof(i.manifestSize)) - b += int(unsafe.Sizeof(i.version)) + b += int(unsafe.Sizeof(p.fieldset)) + p.fieldset.Bytes() + b += int(unsafe.Sizeof(p.path)) + len(p.path) + b += int(unsafe.Sizeof(p.id)) + len(p.id) + b += int(unsafe.Sizeof(p.MaxLogFileSize)) + b += int(unsafe.Sizeof(p.compactionInterrupt)) + b += int(unsafe.Sizeof(p.compactionsDisabled)) + b += int(unsafe.Sizeof(p.logger)) + b += int(unsafe.Sizeof(p.manifestSize)) + b += int(unsafe.Sizeof(p.version)) return b } @@ -140,37 +140,37 @@ func (i *Partition) bytes() int { var ErrIncompatibleVersion = errors.New("incompatible tsi1 index MANIFEST") // Open opens the partition. -func (i *Partition) Open() error { - i.mu.Lock() - defer i.mu.Unlock() +func (p *Partition) Open() error { + p.mu.Lock() + defer p.mu.Unlock() - i.closing = make(chan struct{}) + p.closing = make(chan struct{}) - if i.opened { + if p.opened { return errors.New("index partition already open") } // Validate path is correct. - i.id = filepath.Base(i.path) - _, err := strconv.Atoi(i.id) + p.id = filepath.Base(p.path) + _, err := strconv.Atoi(p.id) if err != nil { return err } // Create directory if it doesn't exist. - if err := os.MkdirAll(i.path, 0777); err != nil { + if err := os.MkdirAll(p.path, 0777); err != nil { return err } // Read manifest file. - m, manifestSize, err := ReadManifestFile(filepath.Join(i.path, ManifestFileName)) + m, manifestSize, err := ReadManifestFile(filepath.Join(p.path, ManifestFileName)) if os.IsNotExist(err) { - m = NewManifest(i.ManifestPath()) + m = NewManifest(p.ManifestPath()) } else if err != nil { return err } // Set manifest size on the partition - i.manifestSize = manifestSize + p.manifestSize = manifestSize // Check to see if the MANIFEST file is compatible with the current Index. if err := m.Validate(); err != nil { @@ -178,18 +178,18 @@ func (i *Partition) Open() error { } // Copy compaction levels to the index. - i.levels = make([]CompactionLevel, len(m.Levels)) - copy(i.levels, m.Levels) + p.levels = make([]CompactionLevel, len(m.Levels)) + copy(p.levels, m.Levels) // Set up flags to track whether a level is compacting. - i.levelCompacting = make([]bool, len(i.levels)) + p.levelCompacting = make([]bool, len(p.levels)) // Open each file in the manifest. var files []File for _, filename := range m.Files { switch filepath.Ext(filename) { case LogFileExt: - f, err := i.openLogFile(filepath.Join(i.path, filename)) + f, err := p.openLogFile(filepath.Join(p.path, filename)) if err != nil { return err } @@ -197,56 +197,56 @@ func (i *Partition) Open() error { // Make first log file active, if within threshold. sz, _ := f.Stat() - if i.activeLogFile == nil && sz < i.MaxLogFileSize { - i.activeLogFile = f + if p.activeLogFile == nil && sz < p.MaxLogFileSize { + p.activeLogFile = f } case IndexFileExt: - f, err := i.openIndexFile(filepath.Join(i.path, filename)) + f, err := p.openIndexFile(filepath.Join(p.path, filename)) if err != nil { return err } files = append(files, f) } } - fs, err := NewFileSet(i.levels, i.sfile, files) + fs, err := NewFileSet(p.levels, p.sfile, files) if err != nil { return err } - i.fileSet = fs + p.fileSet = fs // Set initial sequence number. - i.seq = i.fileSet.MaxID() + p.seq = p.fileSet.MaxID() // Delete any files not in the manifest. - if err := i.deleteNonManifestFiles(m); err != nil { + if err := p.deleteNonManifestFiles(m); err != nil { return err } // Ensure a log file exists. - if i.activeLogFile == nil { - if err := i.prependActiveLogFile(); err != nil { + if p.activeLogFile == nil { + if err := p.prependActiveLogFile(); err != nil { return err } } // Build series existance set. - if err := i.buildSeriesSet(); err != nil { + if err := p.buildSeriesSet(); err != nil { return err } // Mark opened. - i.opened = true + p.opened = true // Send a compaction request on start up. - i.compact() + p.compact() return nil } // openLogFile opens a log file and appends it to the index. -func (i *Partition) openLogFile(path string) (*LogFile, error) { - f := NewLogFile(i.sfile, path) +func (p *Partition) openLogFile(path string) (*LogFile, error) { + f := NewLogFile(p.sfile, path) if err := f.Open(); err != nil { return nil, err } @@ -254,8 +254,8 @@ func (i *Partition) openLogFile(path string) (*LogFile, error) { } // openIndexFile opens a log file and appends it to the index. -func (i *Partition) openIndexFile(path string) (*IndexFile, error) { - f := NewIndexFile(i.sfile) +func (p *Partition) openIndexFile(path string) (*IndexFile, error) { + f := NewIndexFile(p.sfile) f.SetPath(path) if err := f.Open(); err != nil { return nil, err @@ -264,8 +264,8 @@ func (i *Partition) openIndexFile(path string) (*IndexFile, error) { } // deleteNonManifestFiles removes all files not in the manifest. -func (i *Partition) deleteNonManifestFiles(m *Manifest) error { - dir, err := os.Open(i.path) +func (p *Partition) deleteNonManifestFiles(m *Manifest) error { + dir, err := os.Open(p.path) if err != nil { return err } @@ -288,7 +288,7 @@ func (i *Partition) deleteNonManifestFiles(m *Manifest) error { } } - return nil + return dir.Close() } func (p *Partition) buildSeriesSet() error { @@ -319,30 +319,34 @@ func (p *Partition) buildSeriesSet() error { } // Wait returns once outstanding compactions have finished. -func (i *Partition) Wait() { - i.wg.Wait() +func (p *Partition) Wait() { + p.wg.Wait() } // Close closes the index. -func (i *Partition) Close() error { +func (p *Partition) Close() error { // Wait for goroutines to finish outstanding compactions. - i.once.Do(func() { - close(i.closing) - close(i.compactionInterrupt) + p.once.Do(func() { + close(p.closing) + close(p.compactionInterrupt) }) - i.wg.Wait() + p.wg.Wait() // Lock index and close remaining - i.mu.Lock() - defer i.mu.Unlock() + p.mu.Lock() + defer p.mu.Unlock() + + var err error // Close log files. - for _, f := range i.fileSet.files { - f.Close() + for _, f := range p.fileSet.files { + if localErr := f.Close(); localErr != nil { + err = localErr + } } - i.fileSet.files = nil + p.fileSet.files = nil - return nil + return err } // closing returns true if the partition is currently closing. It does not require @@ -357,38 +361,38 @@ func (p *Partition) isClosing() bool { } // Path returns the path to the partition. -func (i *Partition) Path() string { return i.path } +func (p *Partition) Path() string { return p.path } // SeriesFile returns the attached series file. -func (i *Partition) SeriesFile() *tsdb.SeriesFile { return i.sfile } +func (p *Partition) SeriesFile() *tsdb.SeriesFile { return p.sfile } // NextSequence returns the next file identifier. -func (i *Partition) NextSequence() int { - i.mu.Lock() - defer i.mu.Unlock() - return i.nextSequence() +func (p *Partition) NextSequence() int { + p.mu.Lock() + defer p.mu.Unlock() + return p.nextSequence() } -func (i *Partition) nextSequence() int { - i.seq++ - return i.seq +func (p *Partition) nextSequence() int { + p.seq++ + return p.seq } // ManifestPath returns the path to the index's manifest file. -func (i *Partition) ManifestPath() string { - return filepath.Join(i.path, ManifestFileName) +func (p *Partition) ManifestPath() string { + return filepath.Join(p.path, ManifestFileName) } // Manifest returns a manifest for the index. -func (i *Partition) Manifest() *Manifest { +func (p *Partition) Manifest() *Manifest { m := &Manifest{ - Levels: i.levels, - Files: make([]string, len(i.fileSet.files)), - Version: i.version, - path: i.ManifestPath(), + Levels: p.levels, + Files: make([]string, len(p.fileSet.files)), + Version: p.version, + path: p.ManifestPath(), } - for j, f := range i.fileSet.files { + for j, f := range p.fileSet.files { m.Files[j] = filepath.Base(f.Path()) } @@ -396,71 +400,71 @@ func (i *Partition) Manifest() *Manifest { } // WithLogger sets the logger for the index. -func (i *Partition) WithLogger(logger *zap.Logger) { - i.logger = logger.With(zap.String("index", "tsi")) +func (p *Partition) WithLogger(logger *zap.Logger) { + p.logger = logger.With(zap.String("index", "tsi")) } // SetFieldSet sets a shared field set from the engine. -func (i *Partition) SetFieldSet(fs *tsdb.MeasurementFieldSet) { - i.mu.Lock() - i.fieldset = fs - i.mu.Unlock() +func (p *Partition) SetFieldSet(fs *tsdb.MeasurementFieldSet) { + p.mu.Lock() + p.fieldset = fs + p.mu.Unlock() } // FieldSet returns the fieldset. -func (i *Partition) FieldSet() *tsdb.MeasurementFieldSet { - i.mu.Lock() - fs := i.fieldset - i.mu.Unlock() +func (p *Partition) FieldSet() *tsdb.MeasurementFieldSet { + p.mu.Lock() + fs := p.fieldset + p.mu.Unlock() return fs } // RetainFileSet returns the current fileset and adds a reference count. -func (i *Partition) RetainFileSet() (*FileSet, error) { +func (p *Partition) RetainFileSet() (*FileSet, error) { select { - case <-i.closing: + case <-p.closing: return nil, errors.New("index is closing") default: - i.mu.RLock() - defer i.mu.RUnlock() - return i.retainFileSet(), nil + p.mu.RLock() + defer p.mu.RUnlock() + return p.retainFileSet(), nil } } -func (i *Partition) retainFileSet() *FileSet { - fs := i.fileSet +func (p *Partition) retainFileSet() *FileSet { + fs := p.fileSet fs.Retain() return fs } // FileN returns the active files in the file set. -func (i *Partition) FileN() int { return len(i.fileSet.files) } +func (p *Partition) FileN() int { return len(p.fileSet.files) } // prependActiveLogFile adds a new log file so that the current log file can be compacted. -func (i *Partition) prependActiveLogFile() error { +func (p *Partition) prependActiveLogFile() error { // Open file and insert it into the first position. - f, err := i.openLogFile(filepath.Join(i.path, FormatLogFileName(i.nextSequence()))) + f, err := p.openLogFile(filepath.Join(p.path, FormatLogFileName(p.nextSequence()))) if err != nil { return err } - i.activeLogFile = f + p.activeLogFile = f // Prepend and generate new fileset. - i.fileSet = i.fileSet.PrependLogFile(f) + p.fileSet = p.fileSet.PrependLogFile(f) // Write new manifest. - manifestSize, err := i.Manifest().Write() + manifestSize, err := p.Manifest().Write() if err != nil { // TODO: Close index if write fails. return err } - i.manifestSize = manifestSize + p.manifestSize = manifestSize return nil } // ForEachMeasurementName iterates over all measurement names in the index. -func (i *Partition) ForEachMeasurementName(fn func(name []byte) error) error { - fs, err := i.RetainFileSet() +func (p *Partition) ForEachMeasurementName(fn func(name []byte) error) error { + fs, err := p.RetainFileSet() if err != nil { return err } @@ -498,8 +502,8 @@ func (p *Partition) MeasurementHasSeries(name []byte) (bool, error) { } // MeasurementIterator returns an iterator over all measurement names. -func (i *Partition) MeasurementIterator() (tsdb.MeasurementIterator, error) { - fs, err := i.RetainFileSet() +func (p *Partition) MeasurementIterator() (tsdb.MeasurementIterator, error) { + fs, err := p.RetainFileSet() if err != nil { return nil, err } @@ -512,8 +516,8 @@ func (i *Partition) MeasurementIterator() (tsdb.MeasurementIterator, error) { } // MeasurementExists returns true if a measurement exists. -func (i *Partition) MeasurementExists(name []byte) (bool, error) { - fs, err := i.RetainFileSet() +func (p *Partition) MeasurementExists(name []byte) (bool, error) { + fs, err := p.RetainFileSet() if err != nil { return false, err } @@ -522,8 +526,8 @@ func (i *Partition) MeasurementExists(name []byte) (bool, error) { return m != nil && !m.Deleted(), nil } -func (i *Partition) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { - fs, err := i.RetainFileSet() +func (p *Partition) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { + fs, err := p.RetainFileSet() if err != nil { return nil, err } @@ -544,8 +548,8 @@ func (i *Partition) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) return a, nil } -func (i *Partition) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator, error) { - fs, err := i.RetainFileSet() +func (p *Partition) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator, error) { + fs, err := p.RetainFileSet() if err != nil { return nil, err } @@ -554,71 +558,75 @@ func (i *Partition) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDItera // DropMeasurement deletes a measurement from the index. DropMeasurement does // not remove any series from the index directly. -func (i *Partition) DropMeasurement(name []byte) error { - fs, err := i.RetainFileSet() - if err != nil { - return err - } - defer fs.Release() +func (p *Partition) DropMeasurement(name []byte) error { + err := func() error { + fs, err := p.RetainFileSet() + if err != nil { + return err + } + fs.Release() - // Delete all keys and values. - if kitr := fs.TagKeyIterator(name); kitr != nil { - for k := kitr.Next(); k != nil; k = kitr.Next() { - // Delete key if not already deleted. - if !k.Deleted() { - if err := func() error { - i.mu.RLock() - defer i.mu.RUnlock() - return i.activeLogFile.DeleteTagKey(name, k.Key()) - }(); err != nil { - return err + // Delete all keys and values. + if kitr := fs.TagKeyIterator(name); kitr != nil { + for k := kitr.Next(); k != nil; k = kitr.Next() { + // Delete key if not already deleted. + if !k.Deleted() { + if err := func() error { + p.mu.RLock() + defer p.mu.RUnlock() + return p.activeLogFile.DeleteTagKey(name, k.Key()) + }(); err != nil { + return err + } } - } - // Delete each value in key. - if vitr := k.TagValueIterator(); vitr != nil { - for v := vitr.Next(); v != nil; v = vitr.Next() { - if !v.Deleted() { - if err := func() error { - i.mu.RLock() - defer i.mu.RUnlock() - return i.activeLogFile.DeleteTagValue(name, k.Key(), v.Value()) - }(); err != nil { - return err + // Delete each value in key. + if vitr := k.TagValueIterator(); vitr != nil { + for v := vitr.Next(); v != nil; v = vitr.Next() { + if !v.Deleted() { + if err := func() error { + p.mu.RLock() + defer p.mu.RUnlock() + return p.activeLogFile.DeleteTagValue(name, k.Key(), v.Value()) + }(); err != nil { + return err + } } } } } } - } - // Delete all series. - if itr := fs.MeasurementSeriesIDIterator(name); itr != nil { - defer itr.Close() - for { - elem, err := itr.Next() - if err != nil { - return err - } else if elem.SeriesID == 0 { - break + // Delete all series. + if itr := fs.MeasurementSeriesIDIterator(name); itr != nil { + defer itr.Close() + for { + elem, err := itr.Next() + if err != nil { + return err + } else if elem.SeriesID == 0 { + break + } + if err := p.activeLogFile.DeleteSeriesID(elem.SeriesID); err != nil { + return err + } } - if err := i.activeLogFile.DeleteSeriesID(elem.SeriesID); err != nil { + if err = itr.Close(); err != nil { return err } } - } - // Mark measurement as deleted. - if err := func() error { - i.mu.RLock() - defer i.mu.RUnlock() - return i.activeLogFile.DeleteMeasurement(name) - }(); err != nil { + // Mark measurement as deleted. + p.mu.RLock() + defer p.mu.RUnlock() + return p.activeLogFile.DeleteMeasurement(name) + }() + if err != nil { return err } // Check if the log file needs to be swapped. - if err := i.CheckLogFile(); err != nil { + if err := p.CheckLogFile(); err != nil { return err } @@ -627,49 +635,51 @@ func (i *Partition) DropMeasurement(name []byte) error { // createSeriesListIfNotExists creates a list of series if they doesn't exist in // bulk. -func (i *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags) error { +func (p *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags) error { // Is there anything to do? The partition may have been sent an empty batch. if len(names) == 0 { return nil } else if len(names) != len(tagsSlice) { - return fmt.Errorf("uneven batch, partition %s sent %d names and %d tags", i.id, len(names), len(tagsSlice)) + return fmt.Errorf("uneven batch, partition %s sent %d names and %d tags", p.id, len(names), len(tagsSlice)) } - // Maintain reference count on files in file set. - fs, err := i.RetainFileSet() + err := func() error { + // Maintain reference count on files in file set. + fs, err := p.RetainFileSet() + if err != nil { + return err + } + defer fs.Release() + + // Ensure fileset cannot change during insert. + p.mu.RLock() + defer p.mu.RUnlock() + // Insert series into log file. + return p.activeLogFile.AddSeriesList(p.seriesIDSet, names, tagsSlice) + }() if err != nil { return err } - defer fs.Release() - // Ensure fileset cannot change during insert. - i.mu.RLock() - // Insert series into log file. - if err := i.activeLogFile.AddSeriesList(i.seriesIDSet, names, tagsSlice); err != nil { - i.mu.RUnlock() - return err - } - i.mu.RUnlock() - - return i.CheckLogFile() + return p.CheckLogFile() } -func (i *Partition) DropSeries(seriesID uint64) error { +func (p *Partition) DropSeries(seriesID uint64) error { // Delete series from index. - if err := i.activeLogFile.DeleteSeriesID(seriesID); err != nil { + if err := p.activeLogFile.DeleteSeriesID(seriesID); err != nil { return err } - i.seriesIDSet.Remove(seriesID) + p.seriesIDSet.Remove(seriesID) // Swap log file, if necessary. - return i.CheckLogFile() + return p.CheckLogFile() } // MeasurementsSketches returns the two sketches for the partition by merging all // instances of the type sketch types in all the index files. -func (i *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { - fs, err := i.RetainFileSet() +func (p *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { + fs, err := p.RetainFileSet() if err != nil { return nil, nil, err } @@ -679,8 +689,8 @@ func (i *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, // SeriesSketches returns the two sketches for the partition by merging all // instances of the type sketch types in all the index files. -func (i *Partition) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { - fs, err := i.RetainFileSet() +func (p *Partition) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { + fs, err := p.RetainFileSet() if err != nil { return nil, nil, err } @@ -689,8 +699,8 @@ func (i *Partition) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) } // HasTagKey returns true if tag key exists. -func (i *Partition) HasTagKey(name, key []byte) (bool, error) { - fs, err := i.RetainFileSet() +func (p *Partition) HasTagKey(name, key []byte) (bool, error) { + fs, err := p.RetainFileSet() if err != nil { return false, err } @@ -699,8 +709,8 @@ func (i *Partition) HasTagKey(name, key []byte) (bool, error) { } // HasTagValue returns true if tag value exists. -func (i *Partition) HasTagValue(name, key, value []byte) (bool, error) { - fs, err := i.RetainFileSet() +func (p *Partition) HasTagValue(name, key, value []byte) (bool, error) { + fs, err := p.RetainFileSet() if err != nil { return false, err } @@ -709,8 +719,8 @@ func (i *Partition) HasTagValue(name, key, value []byte) (bool, error) { } // TagKeyIterator returns an iterator for all keys across a single measurement. -func (i *Partition) TagKeyIterator(name []byte) tsdb.TagKeyIterator { - fs, err := i.RetainFileSet() +func (p *Partition) TagKeyIterator(name []byte) tsdb.TagKeyIterator { + fs, err := p.RetainFileSet() if err != nil { return nil // TODO(edd): this should probably return an error. } @@ -724,8 +734,8 @@ func (i *Partition) TagKeyIterator(name []byte) tsdb.TagKeyIterator { } // TagValueIterator returns an iterator for all values across a single key. -func (i *Partition) TagValueIterator(name, key []byte) tsdb.TagValueIterator { - fs, err := i.RetainFileSet() +func (p *Partition) TagValueIterator(name, key []byte) tsdb.TagValueIterator { + fs, err := p.RetainFileSet() if err != nil { return nil // TODO(edd): this should probably return an error. } @@ -739,8 +749,8 @@ func (i *Partition) TagValueIterator(name, key []byte) tsdb.TagValueIterator { } // TagKeySeriesIDIterator returns a series iterator for all values across a single key. -func (i *Partition) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator { - fs, err := i.RetainFileSet() +func (p *Partition) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator { + fs, err := p.RetainFileSet() if err != nil { return nil // TODO(edd): this should probably return an error. } @@ -754,8 +764,8 @@ func (i *Partition) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterat } // TagValueSeriesIDIterator returns a series iterator for a single key value. -func (i *Partition) TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesIDIterator { - fs, err := i.RetainFileSet() +func (p *Partition) TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesIDIterator { + fs, err := p.RetainFileSet() if err != nil { return nil // TODO(edd): this should probably return an error. } @@ -769,8 +779,8 @@ func (i *Partition) TagValueSeriesIDIterator(name, key, value []byte) tsdb.Serie } // MeasurementTagKeysByExpr extracts the tag keys wanted by the expression. -func (i *Partition) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { - fs, err := i.RetainFileSet() +func (p *Partition) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { + fs, err := p.RetainFileSet() if err != nil { return nil, err } @@ -780,8 +790,8 @@ func (i *Partition) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (m } // ForEachMeasurementTagKey iterates over all tag keys in a measurement. -func (i *Partition) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error { - fs, err := i.RetainFileSet() +func (p *Partition) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error { + fs, err := p.RetainFileSet() if err != nil { return err } @@ -803,72 +813,72 @@ func (i *Partition) ForEachMeasurementTagKey(name []byte, fn func(key []byte) er // TagKeyCardinality always returns zero. // It is not possible to determine cardinality of tags across index files. -func (i *Partition) TagKeyCardinality(name, key []byte) int { +func (p *Partition) TagKeyCardinality(name, key []byte) int { return 0 } -func (i *Partition) SetFieldName(measurement []byte, name string) {} -func (i *Partition) RemoveShard(shardID uint64) {} -func (i *Partition) AssignShard(k string, shardID uint64) {} +func (p *Partition) SetFieldName(measurement []byte, name string) {} +func (p *Partition) RemoveShard(shardID uint64) {} +func (p *Partition) AssignShard(k string, shardID uint64) {} // Compact requests a compaction of log files. -func (i *Partition) Compact() { - i.mu.Lock() - defer i.mu.Unlock() - i.compact() +func (p *Partition) Compact() { + p.mu.Lock() + defer p.mu.Unlock() + p.compact() } -func (i *Partition) DisableCompactions() { - i.mu.Lock() - defer i.mu.Unlock() - i.compactionsDisabled++ +func (p *Partition) DisableCompactions() { + p.mu.Lock() + defer p.mu.Unlock() + p.compactionsDisabled++ select { - case <-i.closing: + case <-p.closing: return default: } - if i.compactionsDisabled == 0 { - close(i.compactionInterrupt) - i.compactionInterrupt = make(chan struct{}) + if p.compactionsDisabled == 0 { + close(p.compactionInterrupt) + p.compactionInterrupt = make(chan struct{}) } } -func (i *Partition) EnableCompactions() { - i.mu.Lock() - defer i.mu.Unlock() +func (p *Partition) EnableCompactions() { + p.mu.Lock() + defer p.mu.Unlock() // Already enabled? - if i.compactionsEnabled() { + if p.compactionsEnabled() { return } - i.compactionsDisabled-- + p.compactionsDisabled-- } -func (i *Partition) compactionsEnabled() bool { - return i.compactionsDisabled == 0 +func (p *Partition) compactionsEnabled() bool { + return p.compactionsDisabled == 0 } // compact compacts continguous groups of files that are not currently compacting. -func (i *Partition) compact() { - if i.isClosing() { +func (p *Partition) compact() { + if p.isClosing() { return - } else if !i.compactionsEnabled() { + } else if !p.compactionsEnabled() { return } - interrupt := i.compactionInterrupt + interrupt := p.compactionInterrupt - fs := i.retainFileSet() + fs := p.retainFileSet() defer fs.Release() // Iterate over each level we are going to compact. // We skip the first level (0) because it is log files and they are compacted separately. // We skip the last level because the files have no higher level to compact into. - minLevel, maxLevel := 1, len(i.levels)-2 + minLevel, maxLevel := 1, len(p.levels)-2 for level := minLevel; level <= maxLevel; level++ { // Skip level if it is currently compacting. - if i.levelCompacting[level] { + if p.levelCompacting[level] { continue } @@ -884,25 +894,25 @@ func (i *Partition) compact() { IndexFiles(files).Retain() // Mark the level as compacting. - i.levelCompacting[level] = true + p.levelCompacting[level] = true // Execute in closure to save reference to the group within the loop. func(files []*IndexFile, level int) { // Start compacting in a separate goroutine. - i.wg.Add(1) + p.wg.Add(1) go func() { - defer i.wg.Done() // Compact to a new level. - i.compactToLevel(files, level+1, interrupt) + p.compactToLevel(files, level+1, interrupt) // Ensure compaction lock for the level is released. - i.mu.Lock() - i.levelCompacting[level] = false - i.mu.Unlock() + p.mu.Lock() + p.levelCompacting[level] = false + p.mu.Unlock() + p.wg.Done() // Check for new compactions - i.Compact() + p.Compact() }() }(files, level) } @@ -910,12 +920,12 @@ func (i *Partition) compact() { // compactToLevel compacts a set of files into a new file. Replaces old files with // compacted file on successful completion. This runs in a separate goroutine. -func (i *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-chan struct{}) { +func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-chan struct{}) { assert(len(files) >= 2, "at least two index files are required for compaction") assert(level > 0, "cannot compact level zero") // Build a logger for this compaction. - log, logEnd := logger.NewOperation(i.logger, "TSI level compaction", "tsi1_compact_to_level", zap.Int("tsi1_level", level)) + log, logEnd := logger.NewOperation(p.logger, "TSI level compaction", "tsi1_compact_to_level", zap.Int("tsi1_level", level)) defer logEnd() // Check for cancellation. @@ -935,7 +945,7 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch start := time.Now() // Create new index file. - path := filepath.Join(i.path, FormatIndexFileName(i.NextSequence(), level)) + path := filepath.Join(p.path, FormatIndexFileName(p.NextSequence(), level)) f, err := os.Create(path) if err != nil { log.Error("Cannot create compaction files", zap.Error(err)) @@ -949,8 +959,8 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch ) // Compact all index files to new index file. - lvl := i.levels[level] - n, err := IndexFiles(files).CompactTo(f, i.sfile, lvl.M, lvl.K, interrupt) + lvl := p.levels[level] + n, err := IndexFiles(files).CompactTo(f, p.sfile, lvl.M, lvl.K, interrupt) if err != nil { log.Error("Cannot compact index files", zap.Error(err)) return @@ -963,7 +973,7 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch } // Reopen as an index file. - file := NewIndexFile(i.sfile) + file := NewIndexFile(p.sfile) file.SetPath(path) if err := file.Open(); err != nil { log.Error("Cannot open new index file", zap.Error(err)) @@ -972,19 +982,19 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch // Obtain lock to swap in index file and write manifest. if err := func() error { - i.mu.Lock() - defer i.mu.Unlock() + p.mu.Lock() + defer p.mu.Unlock() // Replace previous files with new index file. - i.fileSet = i.fileSet.MustReplace(IndexFiles(files).Files(), file) + p.fileSet = p.fileSet.MustReplace(IndexFiles(files).Files(), file) // Write new manifest. - manifestSize, err := i.Manifest().Write() + manifestSize, err := p.Manifest().Write() if err != nil { // TODO: Close index if write fails. return err } - i.manifestSize = manifestSize + p.manifestSize = manifestSize return nil }(); err != nil { log.Error("Cannot write manifest", zap.Error(err)) @@ -1016,43 +1026,48 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch } } -func (i *Partition) Rebuild() {} +func (p *Partition) Rebuild() {} -func (i *Partition) CheckLogFile() error { +// CheckLogFile executes log file compaction and TSI compaction, only if the active log file size exceeds p.MaxLogFileSize. +// Do not call this method from a goroutine that has retained the partition FileSet. +func (p *Partition) CheckLogFile() error { // Check log file size under read lock. if size := func() int64 { - i.mu.RLock() - defer i.mu.RUnlock() - return i.activeLogFile.Size() - }(); size < i.MaxLogFileSize { + p.mu.RLock() + defer p.mu.RUnlock() + return p.activeLogFile.Size() + }(); size < p.MaxLogFileSize { return nil } // If file size exceeded then recheck under write lock and swap files. - i.mu.Lock() - defer i.mu.Unlock() - return i.checkLogFile() + p.mu.Lock() + defer p.mu.Unlock() + return p.checkLogFile() } -func (i *Partition) checkLogFile() error { - if i.activeLogFile.Size() < i.MaxLogFileSize { +func (p *Partition) checkLogFile() error { + if p.activeLogFile.Size() < p.MaxLogFileSize { return nil } // Swap current log file. - logFile := i.activeLogFile + logFile := p.activeLogFile // Open new log file and insert it into the first position. - if err := i.prependActiveLogFile(); err != nil { + if err := p.prependActiveLogFile(); err != nil { return err } // Begin compacting in a background goroutine. - i.wg.Add(1) + p.wg.Add(1) go func() { - defer i.wg.Done() - i.compactLogFile(logFile) - i.Compact() // check for new compactions + defer p.wg.Done() + p.mu.Lock() + defer p.mu.Unlock() + + p.compactLogFile(logFile) + p.compact() }() return nil @@ -1061,14 +1076,14 @@ func (i *Partition) checkLogFile() error { // compactLogFile compacts f into a tsi file. The new file will share the // same identifier but will have a ".tsi" extension. Once the log file is // compacted then the manifest is updated and the log file is discarded. -func (i *Partition) compactLogFile(logFile *LogFile) { - if i.isClosing() { +func (p *Partition) compactLogFile(logFile *LogFile) { + if p.isClosing() { + return + } else if !p.compactionsEnabled() { return } - i.mu.Lock() - interrupt := i.compactionInterrupt - i.mu.Unlock() + interrupt := p.compactionInterrupt start := time.Now() @@ -1077,11 +1092,11 @@ func (i *Partition) compactLogFile(logFile *LogFile) { assert(id != 0, "cannot parse log file id: %s", logFile.Path()) // Build a logger for this compaction. - log, logEnd := logger.NewOperation(i.logger, "TSI log compaction", "tsi1_compact_log_file", zap.Int("tsi1_log_file_id", id)) + log, logEnd := logger.NewOperation(p.logger, "TSI log compaction", "tsi1_compact_log_file", zap.Int("tsi1_log_file_id", id)) defer logEnd() // Create new index file. - path := filepath.Join(i.path, FormatIndexFileName(id, 1)) + path := filepath.Join(p.path, FormatIndexFileName(id, 1)) f, err := os.Create(path) if err != nil { log.Error("Cannot create index file", zap.Error(err)) @@ -1090,7 +1105,7 @@ func (i *Partition) compactLogFile(logFile *LogFile) { defer f.Close() // Compact log file to new index file. - lvl := i.levels[1] + lvl := p.levels[1] n, err := logFile.CompactTo(f, lvl.M, lvl.K, interrupt) if err != nil { log.Error("Cannot compact log file", zap.Error(err), zap.String("path", logFile.Path())) @@ -1104,35 +1119,26 @@ func (i *Partition) compactLogFile(logFile *LogFile) { } // Reopen as an index file. - file := NewIndexFile(i.sfile) + file := NewIndexFile(p.sfile) file.SetPath(path) if err := file.Open(); err != nil { log.Error("Cannot open compacted index file", zap.Error(err), zap.String("path", file.Path())) return } - // Obtain lock to swap in index file and write manifest. - if err := func() error { - i.mu.Lock() - defer i.mu.Unlock() + // Replace previous log file with index file. + p.fileSet = p.fileSet.MustReplace([]File{logFile}, file) - // Replace previous log file with index file. - i.fileSet = i.fileSet.MustReplace([]File{logFile}, file) - - // Write new manifest. - manifestSize, err := i.Manifest().Write() - if err != nil { - // TODO: Close index if write fails. - return err - } - - i.manifestSize = manifestSize - return nil - }(); err != nil { + // Write new manifest. + manifestSize, err := p.Manifest().Write() + if err != nil { + // TODO: Close index if write fails. log.Error("Cannot update manifest", zap.Error(err)) return } + p.manifestSize = manifestSize + elapsed := time.Since(start) log.Info("Log file compacted", logger.DurationLiteral("elapsed", elapsed), diff --git a/tsdb/shard.go b/tsdb/shard.go index 81d6085606..746e0e4ed2 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -308,12 +308,13 @@ func (s *Shard) Open() error { return err } + idx.WithLogger(s.baseLogger) + // Open index. if err := idx.Open(); err != nil { return err } s.index = idx - idx.WithLogger(s.baseLogger) // Initialize underlying engine. e, err := NewEngine(s.id, idx, s.path, s.walPath, s.sfile, s.options)