Reduce lock contention, fix rhh lookup.

pull/8172/head
Ben Johnson 2017-03-17 09:44:11 -06:00
parent 1807772388
commit 70efc70abe
No known key found for this signature in database
GPG Key ID: 81741CD251883081
8 changed files with 198 additions and 165 deletions

View File

@ -10,19 +10,19 @@ import (
// HashMap represents a hash map that implements Robin Hood Hashing.
// https://cs.uwaterloo.ca/research/tr/1986/CS-86-14.pdf
type HashMap struct {
hashes []uint32
hashes []uint64
elems []hashElem
n int
capacity int
threshold int
mask uint32
mask uint64
loadFactor int
}
func NewHashMap(opt Options) *HashMap {
m := &HashMap{
capacity: pow2(opt.Capacity), // Limited to 2^32.
capacity: pow2(opt.Capacity), // Limited to 2^64.
loadFactor: opt.LoadFactor,
}
m.alloc()
@ -45,13 +45,13 @@ func (m *HashMap) Put(key []byte, val interface{}) {
}
// If the key was overwritten then decrement the size.
overwritten := m.insert(m.hashKey(key), key, val)
overwritten := m.insert(HashKey(key), key, val)
if overwritten {
m.n--
}
}
func (m *HashMap) insert(hash uint32, key []byte, val interface{}) (overwritten bool) {
func (m *HashMap) insert(hash uint64, key []byte, val interface{}) (overwritten bool) {
pos := int(hash & m.mask)
dist := 0
@ -70,7 +70,7 @@ func (m *HashMap) insert(hash uint32, key []byte, val interface{}) (overwritten
// If the existing elem has probed less than us, then swap places with
// existing elem, and keep going to find another slot for that elem.
elemDist := m.dist(m.hashes[pos], pos)
elemDist := Dist(m.hashes[pos], pos, m.capacity)
if elemDist < dist {
// Swap with current position.
e := &m.elems[pos]
@ -83,7 +83,7 @@ func (m *HashMap) insert(hash uint32, key []byte, val interface{}) (overwritten
}
// Increment position, wrap around on overflow.
pos = int((uint32(pos) + 1) & m.mask)
pos = int((uint64(pos) + 1) & m.mask)
dist++
}
}
@ -91,9 +91,9 @@ func (m *HashMap) insert(hash uint32, key []byte, val interface{}) (overwritten
// alloc elems according to currently set capacity.
func (m *HashMap) alloc() {
m.elems = make([]hashElem, m.capacity)
m.hashes = make([]uint32, m.capacity)
m.hashes = make([]uint64, m.capacity)
m.threshold = (m.capacity * m.loadFactor) / 100
m.mask = uint32(m.capacity - 1)
m.mask = uint64(m.capacity - 1)
}
// grow doubles the capacity and reinserts all existing hashes & elements.
@ -118,33 +118,24 @@ func (m *HashMap) grow() {
// index returns the position of key in the hash map.
func (m *HashMap) index(key []byte) int {
hash := m.hashKey(key)
hash := HashKey(key)
pos := int(hash & m.mask)
dist := 0
for {
if m.hashes[pos] == 0 {
return -1
} else if dist > m.dist(m.hashes[pos], pos) {
} else if dist > Dist(m.hashes[pos], pos, m.capacity) {
return -1
} else if m.hashes[pos] == hash && bytes.Equal(m.elems[pos].key, key) {
return pos
}
pos = int(uint32(pos+1) & m.mask)
pos = int(uint64(pos+1) & m.mask)
dist++
}
}
// hashKey computes a hash of key. Hash is always non-zero.
func (m *HashMap) hashKey(key []byte) uint32 {
h := xxhash.Sum64(key)
if h == 0 {
h = 1
}
return uint32(h)
}
// Elem returns the i-th key/value pair of the hash map.
func (m *HashMap) Elem(i int) (key []byte, value interface{}) {
if i >= len(m.elems) {
@ -169,16 +160,11 @@ func (m *HashMap) AverageProbeCount() float64 {
if hash == 0 {
continue
}
sum += float64(m.dist(hash, i))
sum += float64(Dist(hash, i, m.capacity))
}
return sum/float64(m.n) + 1.0
}
// dist returns the probe distance for a hash in a slot index.
func (m *HashMap) dist(hash uint32, i int) int {
return int(uint32(i+m.capacity-int(hash&m.mask)) & m.mask)
}
// Keys returns a list of sorted keys.
func (m *HashMap) Keys() [][]byte {
a := make([][]byte, 0, m.Len())
@ -196,7 +182,7 @@ func (m *HashMap) Keys() [][]byte {
type hashElem struct {
key []byte
value interface{}
hash uint32
hash uint64
}
// Options represents initialization options that are passed to NewHashMap().
@ -211,10 +197,27 @@ var DefaultOptions = Options{
LoadFactor: 90,
}
// HashKey computes a hash of key. Hash is always non-zero.
func HashKey(key []byte) uint64 {
h := xxhash.Sum64(key)
if h == 0 {
h = 1
}
return h
}
// Dist returns the probe distance for a hash in a slot index.
// NOTE: Capacity must be a power of 2.
func Dist(hash uint64, i, capacity int) int {
mask := uint64(capacity - 1)
dist := int(uint64(i+capacity-int(hash&mask)) & mask)
return dist
}
// pow2 returns the number that is the next highest power of 2.
// Returns v if it is a power of 2.
func pow2(v int) int {
for i := 2; i < 1<<30; i *= 2 {
for i := 2; i < 1<<62; i *= 2 {
if i >= v {
return i
}

View File

@ -12,8 +12,8 @@ var s = `
[write.point_generator]
[write.point_generator.basic]
enabled = true
point_count = 100
series_count = 100000
point_count = 1
series_count = 100000000
tick = "10s"
jitter = true
measurement = "cpu"

View File

@ -123,7 +123,6 @@ func (i *Index) Open() error {
// Open each file in the manifest.
for _, filename := range m.Files {
println("dbg/FILE", filename)
switch filepath.Ext(filename) {
case LogFileExt:
f, err := i.openLogFile(filepath.Join(i.Path, filename))
@ -155,6 +154,13 @@ func (i *Index) Open() error {
return err
}
// Ensure a log file exists.
if i.activeLogFile == nil {
if err := i.prependActiveLogFile(); err != nil {
return err
}
}
// Mark opened.
i.opened = true
@ -274,9 +280,9 @@ func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) {
// RetainFileSet returns the current fileset and adds a reference count.
func (i *Index) RetainFileSet() FileSet {
i.mu.Lock()
i.mu.RLock()
fs := i.retainFileSet()
i.mu.Unlock()
i.mu.RUnlock()
return fs
}
@ -308,24 +314,6 @@ func (i *Index) prependActiveLogFile() error {
return nil
}
// WithLogFile executes fn with the active log file under write lock.
func (i *Index) WithLogFile(fn func(f *LogFile) error) error {
i.mu.Lock()
defer i.mu.Unlock()
return i.withLogFile(fn)
}
func (i *Index) withLogFile(fn func(f *LogFile) error) error {
// Create log file if it doesn't exist.
if i.activeLogFile == nil {
if err := i.prependActiveLogFile(); err != nil {
return err
}
}
return fn(i.activeLogFile)
}
// ForEachMeasurementName iterates over all measurement names in the index.
func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error {
fs := i.RetainFileSet()
@ -383,9 +371,11 @@ func (i *Index) DropMeasurement(name []byte) error {
for k := kitr.Next(); k != nil; k = kitr.Next() {
// Delete key if not already deleted.
if !k.Deleted() {
if err := i.WithLogFile(func(f *LogFile) error {
return f.DeleteTagKey(name, k.Key())
}); err != nil {
if err := func() error {
i.mu.RLock()
defer i.mu.RUnlock()
return i.activeLogFile.DeleteTagKey(name, k.Key())
}(); err != nil {
return err
}
}
@ -394,9 +384,11 @@ func (i *Index) DropMeasurement(name []byte) error {
if vitr := k.TagValueIterator(); vitr != nil {
for v := vitr.Next(); v != nil; v = vitr.Next() {
if !v.Deleted() {
if err := i.WithLogFile(func(f *LogFile) error {
return f.DeleteTagValue(name, k.Key(), v.Value())
}); err != nil {
if err := func() error {
i.mu.RLock()
defer i.mu.RUnlock()
return i.activeLogFile.DeleteTagValue(name, k.Key(), v.Value())
}(); err != nil {
return err
}
}
@ -409,9 +401,11 @@ func (i *Index) DropMeasurement(name []byte) error {
if sitr := fs.MeasurementSeriesIterator(name); sitr != nil {
for s := sitr.Next(); s != nil; s = sitr.Next() {
if !s.Deleted() {
if err := i.WithLogFile(func(f *LogFile) error {
return f.DeleteSeries(s.Name(), s.Tags())
}); err != nil {
if err := func() error {
i.mu.RLock()
defer i.mu.RUnlock()
return i.activeLogFile.DeleteSeries(s.Name(), s.Tags())
}(); err != nil {
return err
}
}
@ -419,86 +413,100 @@ func (i *Index) DropMeasurement(name []byte) error {
}
// Mark measurement as deleted.
if err := i.WithLogFile(func(f *LogFile) error {
return f.DeleteMeasurement(name)
}); err != nil {
if err := func() error {
i.mu.RLock()
defer i.mu.RUnlock()
return i.activeLogFile.DeleteMeasurement(name)
}(); err != nil {
return err
}
// Check if the log file needs to be swapped.
i.mu.Lock()
defer i.mu.Unlock()
i.checkLogFile()
if err := i.CheckLogFile(); err != nil {
return err
}
return nil
}
// CreateSeriesListIfNotExists creates a list of series if they doesn't exist in bulk.
func (i *Index) CreateSeriesListIfNotExists(_, names [][]byte, tagsSlice []models.Tags) error {
i.mu.Lock()
defer i.mu.Unlock()
// All slices must be of equal length.
if len(names) != len(tagsSlice) {
return errors.New("names/tags length mismatch")
}
fs := i.retainFileSet()
defer fs.Release()
if err := func() error {
// Ensure fileset cannot change during insert.
i.mu.RLock()
defer i.mu.RUnlock()
// Filter out existing series. Exit if no new series exist.
names, tagsSlice = fs.FilterNamesTags(names, tagsSlice)
if len(names) == 0 {
// Maintain reference count on files in file set.
fs := i.retainFileSet()
defer fs.Release()
// Filter out existing series. Exit if no new series exist.
names, tagsSlice = fs.FilterNamesTags(names, tagsSlice)
if len(names) == 0 {
return nil
}
// Insert series into log file.
if err := i.activeLogFile.AddSeriesList(names, tagsSlice); err != nil {
return err
}
return nil
}
// Insert series into log file.
if err := i.withLogFile(func(f *LogFile) error {
return f.AddSeriesList(names, tagsSlice)
}); err != nil {
}(); err != nil {
return err
}
// Swap log file, if necesssary.
i.checkLogFile()
if err := i.CheckLogFile(); err != nil {
return err
}
return nil
}
// CreateSeriesIfNotExists creates a series if it doesn't exist or is deleted.
func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
i.mu.RLock()
defer i.mu.RUnlock()
if err := func() error {
i.mu.RLock()
defer i.mu.RUnlock()
fs := i.retainFileSet()
defer fs.Release()
fs := i.retainFileSet()
defer fs.Release()
if fs.HasSeries(name, tags) {
if fs.HasSeries(name, tags) {
return nil
}
if err := i.activeLogFile.AddSeries(name, tags); err != nil {
return err
}
return nil
}
if err := i.withLogFile(func(f *LogFile) error {
return f.AddSeries(name, tags)
}); err != nil {
}(); err != nil {
return err
}
// Swap log file, if necesssary.
i.checkLogFile()
if err := i.CheckLogFile(); err != nil {
return err
}
return nil
}
func (i *Index) DropSeries(key []byte) error {
i.mu.RLock()
defer i.mu.RUnlock()
if err := func() error {
i.mu.RLock()
defer i.mu.RUnlock()
name, tags, err := models.ParseKey(key)
if err != nil {
return err
}
name, tags, err := models.ParseKey(key)
if err != nil {
return err
}
mname := []byte(name)
if err := i.withLogFile(func(f *LogFile) error {
if err := f.DeleteSeries(mname, tags); err != nil {
mname := []byte(name)
if err := i.activeLogFile.DeleteSeries(mname, tags); err != nil {
return err
}
@ -515,16 +523,18 @@ func (i *Index) DropSeries(key []byte) error {
}
// If no more series exist in the measurement then delete the measurement.
if err := f.DeleteMeasurement(mname); err != nil {
if err := i.activeLogFile.DeleteMeasurement(mname); err != nil {
return err
}
return nil
}); err != nil {
}(); err != nil {
return err
}
// Swap log file, if necesssary.
i.checkLogFile()
if err := i.CheckLogFile(); err != nil {
return err
}
return nil
}
@ -696,10 +706,8 @@ func (i *Index) SnapshotTo(path string) error {
defer fs.Release()
// Flush active log file, if any.
if i.activeLogFile != nil {
if err := i.activeLogFile.Flush(); err != nil {
return err
}
if err := i.activeLogFile.Flush(); err != nil {
return err
}
if err := os.Mkdir(filepath.Join(path, "index"), 0777); err != nil {
@ -924,17 +932,30 @@ func (i *Index) compactGroup(files []*IndexFile) {
}
}
func (i *Index) checkLogFile() {
// If size is within threshold then ignore.
if i.activeLogFile == nil {
return
} else if size, _ := i.activeLogFile.Stat(); size < i.MaxLogFileSize {
return
func (i *Index) CheckLogFile() error {
// Check log file size under read lock.
if size := func() int64 {
i.mu.RLock()
defer i.mu.RUnlock()
return i.activeLogFile.Size()
}(); size < i.MaxLogFileSize {
return nil
}
// Deactivate current log file.
// If file size exceeded then recheck under write lock and swap files.
i.mu.Lock()
defer i.mu.Unlock()
if i.activeLogFile.Size() < i.MaxLogFileSize {
return nil
}
// Swap current log file.
logFile := i.activeLogFile
i.activeLogFile = nil
// Open new log file and insert it into the first position.
if err := i.prependActiveLogFile(); err != nil {
return err
}
// Begin compacting in a background goroutine.
i.wg.Add(1)
@ -943,6 +964,8 @@ func (i *Index) checkLogFile() {
i.compactLogFile(logFile)
i.Compact() // check for new compactions
}()
return nil
}
// compactLogFile compacts f into a tsi file. The new file will share the

View File

@ -182,6 +182,14 @@ func (f *LogFile) Stat() (int64, time.Time) {
return size, modTime
}
// Size returns the size of the file, in bytes.
func (f *LogFile) Size() int64 {
f.mu.Lock()
v := f.size
f.mu.Unlock()
return v
}
// Measurement returns a measurement element.
func (f *LogFile) Measurement(name []byte) MeasurementElem {
f.mu.RLock()

View File

@ -65,7 +65,7 @@ func (blk *MeasurementBlock) Version() int { return blk.version }
// Elem returns an element for a measurement.
func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementBlockElem, ok bool) {
n := binary.BigEndian.Uint64(blk.hashData[:MeasurementNSize])
hash := hashKey(name)
hash := rhh.HashKey(name)
pos := int(hash % n)
// Track current distance
@ -89,7 +89,7 @@ func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementBlockElem, ok bool)
}
// Check if we've exceeded the probe distance.
if d > dist(hashKey(e.name), pos, int(n)) {
if d > rhh.Dist(rhh.HashKey(e.name), pos, int(n)) {
return MeasurementBlockElem{}, false
}
}
@ -437,7 +437,7 @@ func (mw *MeasurementBlockWriter) WriteTo(w io.Writer) (n int64, err error) {
// Build key hash map
m := rhh.NewHashMap(rhh.Options{
Capacity: len(names),
LoadFactor: 90,
LoadFactor: LoadFactor,
})
for name := range mw.mms {
mm := mw.mms[name]

View File

@ -64,7 +64,7 @@ func (blk *SeriesBlock) HasSeries(name []byte, tags models.Tags) (exists, tombst
bufN := uint64(len(buf))
n := blk.seriesIndexN
hash := hashKey(buf)
hash := rhh.HashKey(buf)
pos := int(hash % n)
// Track current distance
@ -77,13 +77,13 @@ func (blk *SeriesBlock) HasSeries(name []byte, tags models.Tags) (exists, tombst
}
// Evaluate encoded value matches expected.
key := blk.data[offset+1 : offset+1+bufN]
key := ReadSeriesKey(blk.data[offset+1 : offset+1+bufN])
if bytes.Equal(buf, key) {
return true, (blk.data[offset] & SeriesTombstoneFlag) != 0
}
// Check if we've exceeded the probe distance.
max := dist(hashKey(key), pos, int(n))
max := rhh.Dist(rhh.HashKey(key), pos, int(n))
if d > max {
return false, false
}
@ -92,11 +92,6 @@ func (blk *SeriesBlock) HasSeries(name []byte, tags models.Tags) (exists, tombst
pos = (pos + 1) % int(n)
d++
// DEBUG(benbjohnson)
if d > 30 {
println("dbg: high series probe count:", d, offset)
}
if uint64(d) > n {
return false, false
}
@ -109,7 +104,7 @@ func (blk *SeriesBlock) Series(name []byte, tags models.Tags) SeriesElem {
bufN := uint64(len(buf))
n := blk.seriesIndexN
hash := hashKey(buf)
hash := rhh.HashKey(buf)
pos := int(hash % n)
// Track current distance
@ -122,7 +117,7 @@ func (blk *SeriesBlock) Series(name []byte, tags models.Tags) SeriesElem {
}
// Evaluate encoded value matches expected.
key := blk.data[offset+1 : offset+1+bufN]
key := ReadSeriesKey(blk.data[offset+1 : offset+1+bufN])
if bytes.Equal(buf, key) {
var e SeriesBlockElem
e.UnmarshalBinary(blk.data[offset:])
@ -130,7 +125,7 @@ func (blk *SeriesBlock) Series(name []byte, tags models.Tags) SeriesElem {
}
// Check if we've exceeded the probe distance.
if d > dist(hashKey(key), pos, int(n)) {
if d > rhh.Dist(rhh.HashKey(key), pos, int(n)) {
return nil
}
@ -343,6 +338,32 @@ func AppendSeriesKey(dst []byte, name []byte, tags models.Tags) []byte {
return dst
}
// ReadSeriesKey returns the series key from the beginning of the buffer.
func ReadSeriesKey(data []byte) []byte {
buf := data
// Name (len+data)
n := int(binary.BigEndian.Uint16(buf))
buf = buf[2+n:]
// Tag count.
tagN := int(binary.BigEndian.Uint16(buf))
buf = buf[2:]
// Read tags.
for i := 0; i < tagN; i++ {
// Key
n := int(binary.BigEndian.Uint16(buf))
buf = buf[2+n:]
// Value
n = int(binary.BigEndian.Uint16(buf))
buf = buf[2+n:]
}
return data[:len(data)-len(buf)]
}
func CompareSeriesKeys(a, b []byte) int {
// Read names.
var n uint16
@ -420,7 +441,7 @@ func NewSeriesBlockEncoder(w io.Writer) *SeriesBlockEncoder {
return &SeriesBlockEncoder{
w: w,
offsets: rhh.NewHashMap(rhh.Options{LoadFactor: 50}),
offsets: rhh.NewHashMap(rhh.Options{LoadFactor: LoadFactor}),
sketch: hll.NewDefaultPlus(),
tSketch: hll.NewDefaultPlus(),

View File

@ -91,7 +91,7 @@ func (blk *TagBlock) UnmarshalBinary(data []byte) error {
// Returns an element with a nil key if not found.
func (blk *TagBlock) TagKeyElem(key []byte) TagKeyElem {
keyN := binary.BigEndian.Uint64(blk.hashData[:TagKeyNSize])
hash := hashKey(key)
hash := rhh.HashKey(key)
pos := int(hash % keyN)
// Track current distance
@ -113,7 +113,7 @@ func (blk *TagBlock) TagKeyElem(key []byte) TagKeyElem {
}
// Check if we've exceeded the probe distance.
if d > dist(hashKey(e.key), pos, int(keyN)) {
if d > rhh.Dist(rhh.HashKey(e.key), pos, int(keyN)) {
return nil
}
@ -121,11 +121,6 @@ func (blk *TagBlock) TagKeyElem(key []byte) TagKeyElem {
pos = (pos + 1) % int(keyN)
d++
// DEBUG(benbjohnson)
if d > 30 {
println("dbg: high tag key probe count:", d)
}
if uint64(d) > keyN {
return nil
}
@ -144,12 +139,11 @@ func (blk *TagBlock) TagValueElem(key, value []byte) TagValueElem {
hashData := kelem.hashIndex.buf
valueN := binary.BigEndian.Uint64(hashData[:TagValueNSize])
hash := hashKey(value)
hash := rhh.HashKey(value)
pos := int(hash % valueN)
// Track current distance
var d int
for {
// Find offset of tag value.
offset := binary.BigEndian.Uint64(hashData[TagValueNSize+(pos*TagValueOffsetSize):])
@ -167,7 +161,8 @@ func (blk *TagBlock) TagValueElem(key, value []byte) TagValueElem {
}
// Check if we've exceeded the probe distance.
if d > dist(hashKey(e.value), pos, int(valueN)) {
max := rhh.Dist(rhh.HashKey(e.value), pos, int(valueN))
if d > max {
return nil
}
@ -175,11 +170,6 @@ func (blk *TagBlock) TagValueElem(key, value []byte) TagValueElem {
pos = (pos + 1) % int(valueN)
d++
// DEBUG(benbjohnson)
if d > 30 {
println("dbg: high tag value probe count:", d)
}
if uint64(d) > valueN {
return nil
}
@ -478,7 +468,7 @@ type TagBlockEncoder struct {
func NewTagBlockEncoder(w io.Writer) *TagBlockEncoder {
return &TagBlockEncoder{
w: w,
offsets: rhh.NewHashMap(rhh.Options{LoadFactor: 50}),
offsets: rhh.NewHashMap(rhh.Options{LoadFactor: LoadFactor}),
trailer: TagBlockTrailer{
Version: TagBlockVersion,
},
@ -633,14 +623,14 @@ func (enc *TagBlockEncoder) flushValueHashIndex() error {
key.hashIndex.size = enc.n - key.hashIndex.offset
// Clear offsets.
enc.offsets = rhh.NewHashMap(rhh.Options{LoadFactor: 50})
enc.offsets = rhh.NewHashMap(rhh.Options{LoadFactor: LoadFactor})
return nil
}
// encodeTagKeyBlock encodes the keys section to the writer.
func (enc *TagBlockEncoder) encodeTagKeyBlock() error {
offsets := rhh.NewHashMap(rhh.Options{Capacity: len(enc.keys), LoadFactor: 50})
offsets := rhh.NewHashMap(rhh.Options{Capacity: len(enc.keys), LoadFactor: LoadFactor})
// Encode key list in sorted order.
enc.trailer.KeyData.Offset = enc.n

View File

@ -8,11 +8,13 @@ import (
"io"
"os"
"github.com/cespare/xxhash"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
)
// LoadFactor is the fill percent for RHH indexes.
const LoadFactor = 80
// MeasurementElem represents a generic measurement element.
type MeasurementElem interface {
Name() []byte
@ -771,20 +773,6 @@ func writeUvarintTo(w io.Writer, v uint64, n *int64) error {
return err
}
// hashKey hashes a key using murmur3.
func hashKey(key []byte) uint64 {
h := xxhash.Sum64(key)
if h == 0 {
h = 1
}
return h
}
// dist returns the probe distance for a hash in a slot index.
func dist(hash uint64, i, capacity int) int {
return (i + capacity - (int(hash) % capacity)) % capacity
}
type uint64Slice []uint64
func (a uint64Slice) Len() int { return len(a) }