Remove MMAP derefencing code
This code was added to address some slow startup issues. It is believed to be the cause of some segfault panic's that occur at query time when the underlying MMAP array has been unmapped. The current structure of code makes this change unnecessary now.pull/8192/head
parent
1bcf3ae74c
commit
2972a3f223
|
@ -1,3 +1,9 @@
|
|||
## v1.2.3 [unreleased]
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- [#8022](https://github.com/influxdata/influxdb/issues/8022): Segment violation in models.Tags.Get
|
||||
|
||||
## v1.2.2 [2017-03-14]
|
||||
|
||||
### Release Notes
|
||||
|
|
|
@ -444,7 +444,6 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index *tsdb.DatabaseIndex) er
|
|||
|
||||
// Save reference to index for iterator creation.
|
||||
e.index = index
|
||||
e.FileStore.dereferencer = index
|
||||
|
||||
if err := e.FileStore.WalkKeys(func(key []byte, typ byte) error {
|
||||
fieldType, err := tsmFieldTypeToInfluxQLDataType(typ)
|
||||
|
@ -652,7 +651,7 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key []byte, fieldType influxq
|
|||
_, tags, _ := models.ParseKey(seriesKey)
|
||||
|
||||
s := tsdb.NewSeries(string(seriesKey), tags)
|
||||
index.CreateSeriesIndexIfNotExists(measurement, s, false)
|
||||
index.CreateSeriesIndexIfNotExists(measurement, s, true)
|
||||
s.AssignShard(shardID)
|
||||
|
||||
return nil
|
||||
|
|
|
@ -107,13 +107,6 @@ type TSMFile interface {
|
|||
// BlockIterator returns an iterator pointing to the first block in the file and
|
||||
// allows sequential iteration to each and every block.
|
||||
BlockIterator() *BlockIterator
|
||||
|
||||
// Removes mmap references held by another object.
|
||||
deref(dereferencer)
|
||||
}
|
||||
|
||||
type dereferencer interface {
|
||||
Dereference([]byte)
|
||||
}
|
||||
|
||||
// Statistics gathered by the FileStore.
|
||||
|
@ -144,8 +137,6 @@ type FileStore struct {
|
|||
purger *purger
|
||||
|
||||
currentTempDirID int
|
||||
|
||||
dereferencer dereferencer
|
||||
}
|
||||
|
||||
// FileStat holds information about a TSM file on disk.
|
||||
|
@ -457,13 +448,6 @@ func (f *FileStore) Close() error {
|
|||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
for _, file := range f.files {
|
||||
if f.dereferencer != nil {
|
||||
file.deref(f.dereferencer)
|
||||
}
|
||||
file.Close()
|
||||
}
|
||||
|
||||
f.lastFileStats = nil
|
||||
f.files = nil
|
||||
atomic.StoreInt64(&f.stats.FileCount, 0)
|
||||
|
@ -614,11 +598,6 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
|
|||
continue
|
||||
}
|
||||
|
||||
// Remove any mmap references held by the index.
|
||||
if f.dereferencer != nil {
|
||||
file.deref(f.dereferencer)
|
||||
}
|
||||
|
||||
if err := file.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1203,11 +1182,6 @@ func (p *purger) purge() {
|
|||
p.mu.Lock()
|
||||
for k, v := range p.files {
|
||||
if !v.InUse() {
|
||||
// Remove any mmap references held by the index.
|
||||
if p.fileStore.dereferencer != nil {
|
||||
v.deref(p.fileStore.dereferencer)
|
||||
}
|
||||
|
||||
if err := v.Close(); err != nil {
|
||||
p.logger.Info(fmt.Sprintf("purge: close file: %v", err))
|
||||
continue
|
||||
|
|
|
@ -516,13 +516,6 @@ func (t *TSMReader) BlockIterator() *BlockIterator {
|
|||
}
|
||||
}
|
||||
|
||||
// deref removes mmap references held by another object.
|
||||
func (t *TSMReader) deref(d dereferencer) {
|
||||
if acc, ok := t.accessor.(*mmapAccessor); ok && acc.b != nil {
|
||||
d.Dereference(acc.b)
|
||||
}
|
||||
}
|
||||
|
||||
// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This
|
||||
// implementation can be used for indexes that may be MMAPed into memory.
|
||||
type indirectIndex struct {
|
||||
|
|
43
tsdb/meta.go
43
tsdb/meta.go
|
@ -7,7 +7,6 @@ import (
|
|||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
|
@ -532,16 +531,6 @@ func (d *DatabaseIndex) DropSeries(keys []string) {
|
|||
atomic.AddInt64(&d.stats.NumSeries, -nDeleted)
|
||||
}
|
||||
|
||||
// Dereference removes all references to data within b and moves them to the heap.
|
||||
func (d *DatabaseIndex) Dereference(b []byte) {
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
|
||||
for _, s := range d.series {
|
||||
s.Dereference(b)
|
||||
}
|
||||
}
|
||||
|
||||
// Measurement represents a collection of time series in a database. It also contains in-memory
|
||||
// structures for indexing tags. Exported functions are goroutine safe while un-exported functions
|
||||
// assume the caller will use the appropriate locks.
|
||||
|
@ -1636,7 +1625,7 @@ func (s *Series) ForEachTag(fn func(models.Tag)) {
|
|||
func (s *Series) Tags() models.Tags {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.tags.Clone()
|
||||
return s.tags
|
||||
}
|
||||
|
||||
// CopyTags clones the tags on the series in-place,
|
||||
|
@ -1653,36 +1642,6 @@ func (s *Series) GetTagString(key string) string {
|
|||
return s.tags.GetString(key)
|
||||
}
|
||||
|
||||
// Dereference removes references to a byte slice.
|
||||
func (s *Series) Dereference(b []byte) {
|
||||
s.mu.Lock()
|
||||
|
||||
min := uintptr(unsafe.Pointer(&b[0]))
|
||||
max := min + uintptr(len(b))
|
||||
|
||||
for i := range s.tags {
|
||||
deref(&s.tags[i].Key, min, max)
|
||||
deref(&s.tags[i].Value, min, max)
|
||||
}
|
||||
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func deref(v *[]byte, min, max uintptr) {
|
||||
vv := *v
|
||||
|
||||
// Ignore if value is not within range.
|
||||
ptr := uintptr(unsafe.Pointer(&vv[0]))
|
||||
if ptr < min || ptr > max {
|
||||
return
|
||||
}
|
||||
|
||||
// Otherwise copy to the heap.
|
||||
buf := make([]byte, len(vv))
|
||||
copy(buf, vv)
|
||||
*v = buf
|
||||
}
|
||||
|
||||
// MarshalBinary encodes the object to a binary format.
|
||||
func (s *Series) MarshalBinary() ([]byte, error) {
|
||||
s.mu.RLock()
|
||||
|
|
Loading…
Reference in New Issue