Sharded Cache using a hash ring
parent
d3e6d4e7ca
commit
66edb32182
|
@ -17,6 +17,7 @@ The stress tool `influx_stress` will be removed in a subsequent release. We reco
|
|||
- [#7684](https://github.com/influxdata/influxdb/issues/7684): Update Go version to 1.7.4.
|
||||
- [#7036](https://github.com/influxdata/influxdb/issues/7036): Switch logging to use structured logging everywhere.
|
||||
- [#7723](https://github.com/influxdata/influxdb/pull/7723): Remove the override of GOMAXPROCS.
|
||||
- [#7633](https://github.com/influxdata/influxdb/pull/7633): improve write performance significantly.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
|
1
Godeps
1
Godeps
|
@ -2,6 +2,7 @@ collectd.org e84e8af5356e7f47485bbc95c96da6dd7984a67e
|
|||
github.com/BurntSushi/toml 99064174e013895bbd9b025c31100bd1d9b590ca
|
||||
github.com/bmizerany/pat c068ca2f0aacee5ac3681d68e4d0a003b7d1fd2c
|
||||
github.com/boltdb/bolt 4b1ebc1869ad66568b313d0dc410e2be72670dda
|
||||
github.com/cespare/xxhash 1ce10610a70c8da08278fbe935f2feafd6d9074d
|
||||
github.com/davecgh/go-spew 346938d642f2ec3594ed81d874461961cd0faa76
|
||||
github.com/dgrijalva/jwt-go 24c63f56522a87ec5339cc3567883f1039378fdb
|
||||
github.com/dgryski/go-bits 2ad8d707cc05b1815ce6ff2543bb5e8d8f9298ef
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -14,6 +13,13 @@ import (
|
|||
"github.com/uber-go/zap"
|
||||
)
|
||||
|
||||
// ringShards specifies the number of partitions that the hash ring used to
|
||||
// store the entry mappings contains. It must be a power of 2. From empirical
|
||||
// testing, a value above the number of cores on the machine does not provide
|
||||
// any additional benefit. For now we'll set it to the number of cores on the
|
||||
// largest box we could imagine running influx.
|
||||
const ringShards = 128
|
||||
|
||||
var (
|
||||
ErrCacheInvalidCheckpoint = fmt.Errorf("invalid checkpoint")
|
||||
ErrSnapshotInProgress = fmt.Errorf("snapshot in progress")
|
||||
|
@ -30,13 +36,6 @@ type entry struct {
|
|||
needSort bool // true if the values are out of order and require deduping.
|
||||
}
|
||||
|
||||
// newEntry returns a new instance of entry.
|
||||
func newEntry() *entry {
|
||||
return &entry{
|
||||
values: make(Values, 0, 32),
|
||||
}
|
||||
}
|
||||
|
||||
// newEntryValues returns a new instance of entry with the given values. If the
|
||||
// values are not valid, an error is returned.
|
||||
func newEntryValues(values []Value) (*entry, error) {
|
||||
|
@ -167,7 +166,7 @@ const (
|
|||
type Cache struct {
|
||||
commit sync.Mutex
|
||||
mu sync.RWMutex
|
||||
store map[string]*entry
|
||||
store *ring
|
||||
size uint64
|
||||
maxSize uint64
|
||||
|
||||
|
@ -188,9 +187,10 @@ type Cache struct {
|
|||
// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
|
||||
// Only used for engine caches, never for snapshots
|
||||
func NewCache(maxSize uint64, path string) *Cache {
|
||||
store, _ := newring(ringShards)
|
||||
c := &Cache{
|
||||
maxSize: maxSize,
|
||||
store: make(map[string]*entry),
|
||||
store: store, // Max size for now..
|
||||
stats: &CacheStatistics{},
|
||||
lastSnapshot: time.Now(),
|
||||
}
|
||||
|
@ -240,20 +240,22 @@ func (c *Cache) Write(key string, values []Value) error {
|
|||
addedSize := uint64(Values(values).Size())
|
||||
|
||||
// Enough room in the cache?
|
||||
c.mu.Lock()
|
||||
c.mu.RLock()
|
||||
limit := c.maxSize
|
||||
n := c.size + c.snapshotSize + addedSize
|
||||
c.mu.RUnlock()
|
||||
|
||||
if limit > 0 && n > limit {
|
||||
c.mu.Unlock()
|
||||
atomic.AddInt64(&c.stats.WriteErr, 1)
|
||||
return ErrCacheMemorySizeLimitExceeded(n, limit)
|
||||
}
|
||||
|
||||
if err := c.write(key, values); err != nil {
|
||||
c.mu.Unlock()
|
||||
if err := c.store.write(key, values); err != nil {
|
||||
atomic.AddInt64(&c.stats.WriteErr, 1)
|
||||
return err
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.size += addedSize
|
||||
c.mu.Unlock()
|
||||
|
||||
|
@ -264,37 +266,46 @@ func (c *Cache) Write(key string, values []Value) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// WriteMulti writes the map of keys and associated values to the cache. This function is goroutine-safe.
|
||||
// It returns an error if the cache will exceeded its max size by adding the new values. The write attempts
|
||||
// to write as many values as possible. If one key fails, the others can still succeed and an error will
|
||||
// be returned.
|
||||
// WriteMulti writes the map of keys and associated values to the cache. This
|
||||
// function is goroutine-safe. It returns an error if the cache will exceeded
|
||||
// its max size by adding the new values. The write attempts to write as many
|
||||
// values as possible. If one key fails, the others can still succeed and an
|
||||
// error will be returned.
|
||||
func (c *Cache) WriteMulti(values map[string][]Value) error {
|
||||
var totalSz uint64
|
||||
var addedSize uint64
|
||||
for _, v := range values {
|
||||
totalSz += uint64(Values(v).Size())
|
||||
addedSize += uint64(Values(v).Size())
|
||||
}
|
||||
|
||||
// Enough room in the cache?
|
||||
c.mu.Lock()
|
||||
// Set everything under one lock. We'll optimistially set size here, and
|
||||
// then grab another lock if we need to decrement it due to write error.
|
||||
// This also gives us the store reference without having to take another
|
||||
// lock.
|
||||
c.size += addedSize
|
||||
limit := c.maxSize
|
||||
n := c.size + c.snapshotSize + totalSz
|
||||
n := c.size + c.snapshotSize + addedSize
|
||||
store := c.store
|
||||
c.mu.Unlock()
|
||||
|
||||
if limit > 0 && n > limit {
|
||||
c.mu.Unlock()
|
||||
atomic.AddInt64(&c.stats.WriteErr, 1)
|
||||
return ErrCacheMemorySizeLimitExceeded(n, limit)
|
||||
}
|
||||
|
||||
var werr error
|
||||
|
||||
for k, v := range values {
|
||||
if err := c.write(k, v); err != nil {
|
||||
// write failed, hold onto the error and adjust
|
||||
// the size delta
|
||||
if err := store.write(k, v); err != nil {
|
||||
// The write failed, hold onto the error and adjust the size delta.
|
||||
werr = err
|
||||
totalSz -= uint64(Values(v).Size())
|
||||
c.mu.Lock()
|
||||
addedSize -= uint64(Values(v).Size())
|
||||
c.size -= uint64(Values(v).Size())
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
c.size += totalSz
|
||||
c.mu.Unlock()
|
||||
|
||||
// Some points in the batch were dropped. An error is returned so
|
||||
// error stat is incremented as well.
|
||||
|
@ -304,7 +315,7 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
|
|||
}
|
||||
|
||||
// Update the memory size stat
|
||||
c.updateMemSize(int64(totalSz))
|
||||
c.updateMemSize(int64(addedSize))
|
||||
atomic.AddInt64(&c.stats.WriteOK, 1)
|
||||
|
||||
return werr
|
||||
|
@ -325,29 +336,42 @@ func (c *Cache) Snapshot() (*Cache, error) {
|
|||
|
||||
// If no snapshot exists, create a new one, otherwise update the existing snapshot
|
||||
if c.snapshot == nil {
|
||||
store, err := newring(ringShards)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.snapshot = &Cache{
|
||||
store: make(map[string]*entry, len(c.store)),
|
||||
store: store,
|
||||
}
|
||||
}
|
||||
|
||||
// Append the current cache values to the snapshot
|
||||
for k, e := range c.store {
|
||||
e.mu.RLock()
|
||||
if _, ok := c.snapshot.store[k]; ok {
|
||||
c.snapshot.store[k].add(e.values)
|
||||
// Append the current cache values to the snapshot.
|
||||
if err := c.store.apply(func(k string, e *entry) error {
|
||||
snapshotEntry, ok := c.snapshot.store.entry(k)
|
||||
if ok {
|
||||
if err := snapshotEntry.add(e.values); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
c.snapshot.store[k] = e
|
||||
c.snapshot.store.add(k, e)
|
||||
snapshotEntry = e
|
||||
}
|
||||
c.snapshotSize += uint64(Values(e.values).Size())
|
||||
if e.needSort {
|
||||
c.snapshot.store[k].needSort = true
|
||||
snapshotEntry.needSort = true
|
||||
}
|
||||
e.mu.RUnlock()
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
snapshotSize := c.size // record the number of bytes written into a snapshot
|
||||
|
||||
c.store = make(map[string]*entry, len(c.store))
|
||||
var err error
|
||||
if c.store, err = newring(ringShards); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.size = 0
|
||||
c.lastSnapshot = time.Now()
|
||||
|
||||
|
@ -362,10 +386,12 @@ func (c *Cache) Snapshot() (*Cache, error) {
|
|||
// coming in while it writes will need the values sorted
|
||||
func (c *Cache) Deduplicate() {
|
||||
c.mu.RLock()
|
||||
for _, e := range c.store {
|
||||
e.deduplicate()
|
||||
}
|
||||
store := c.store
|
||||
c.mu.RUnlock()
|
||||
|
||||
// Apply a function that simply calls deduplicate on each entry in the ring.
|
||||
// apply cannot return an error in this invocation.
|
||||
_ = store.apply(func(_ string, e *entry) error { e.deduplicate(); return nil })
|
||||
}
|
||||
|
||||
// ClearSnapshot will remove the snapshot cache from the list of flushing caches and
|
||||
|
@ -400,27 +426,32 @@ func (c *Cache) MaxSize() uint64 {
|
|||
// Keys returns a sorted slice of all keys under management by the cache.
|
||||
func (c *Cache) Keys() []string {
|
||||
c.mu.RLock()
|
||||
|
||||
var a []string
|
||||
for k, _ := range c.store {
|
||||
a = append(a, k)
|
||||
}
|
||||
store := c.store
|
||||
c.mu.RUnlock()
|
||||
sort.Strings(a)
|
||||
return a
|
||||
return store.keys(true)
|
||||
}
|
||||
|
||||
// unsortedKeys returns a slice of all keys under management by the cache. The
|
||||
// keys are not sorted.
|
||||
func (c *Cache) unsortedKeys() []string {
|
||||
c.mu.RLock()
|
||||
store := c.store
|
||||
c.mu.RUnlock()
|
||||
return store.keys(false)
|
||||
}
|
||||
|
||||
// Values returns a copy of all values, deduped and sorted, for the given key.
|
||||
func (c *Cache) Values(key string) Values {
|
||||
var snapshotEntries *entry
|
||||
|
||||
c.mu.RLock()
|
||||
e := c.store[key]
|
||||
e, ok := c.store.entry(key)
|
||||
if c.snapshot != nil {
|
||||
snapshotEntries = c.snapshot.store[key]
|
||||
snapshotEntries, _ = c.snapshot.store.entry(key)
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
|
||||
if e == nil {
|
||||
if !ok {
|
||||
if snapshotEntries == nil {
|
||||
// No values in hot cache or snapshots.
|
||||
return nil
|
||||
|
@ -486,7 +517,7 @@ func (c *Cache) DeleteRange(keys []string, min, max int64) {
|
|||
|
||||
for _, k := range keys {
|
||||
// Make sure key exist in the cache, skip if it does not
|
||||
e, ok := c.store[k]
|
||||
e, ok := c.store.entry(k)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
@ -494,13 +525,13 @@ func (c *Cache) DeleteRange(keys []string, min, max int64) {
|
|||
origSize := uint64(e.size())
|
||||
if min == math.MinInt64 && max == math.MaxInt64 {
|
||||
c.size -= origSize
|
||||
delete(c.store, k)
|
||||
c.store.remove(k)
|
||||
continue
|
||||
}
|
||||
|
||||
e.filter(min, max)
|
||||
if e.count() == 0 {
|
||||
delete(c.store, k)
|
||||
c.store.remove(k)
|
||||
c.size -= origSize
|
||||
continue
|
||||
}
|
||||
|
@ -516,69 +547,21 @@ func (c *Cache) SetMaxSize(size uint64) {
|
|||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// Store returns the underlying cache store. This is not goroutine safe!
|
||||
// Protect access by using the Lock and Unlock functions on Cache.
|
||||
func (c *Cache) Store() map[string]*entry {
|
||||
return c.store
|
||||
}
|
||||
|
||||
func (c *Cache) RLock() {
|
||||
c.mu.RLock()
|
||||
}
|
||||
|
||||
func (c *Cache) RUnlock() {
|
||||
c.mu.RUnlock()
|
||||
}
|
||||
|
||||
// values returns the values for the key. It doesn't lock and assumes the data is
|
||||
// already sorted. Should only be used in compact.go in the CacheKeyIterator
|
||||
func (c *Cache) values(key string) Values {
|
||||
e := c.store[key]
|
||||
e, _ := c.store.entry(key)
|
||||
if e == nil {
|
||||
return nil
|
||||
}
|
||||
return e.values
|
||||
}
|
||||
|
||||
// write writes the set of values for the key to the cache. This function assumes
|
||||
// the lock has been taken and does not enforce the cache size limits.
|
||||
func (c *Cache) write(key string, values []Value) error {
|
||||
e, ok := c.store[key]
|
||||
if !ok {
|
||||
var err error
|
||||
e, err = newEntryValues(values)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.store[key] = e
|
||||
return nil
|
||||
}
|
||||
return e.add(values)
|
||||
}
|
||||
|
||||
func (c *Cache) entry(key string) *entry {
|
||||
// low-contention path: entry exists, no write operations needed:
|
||||
func (c *Cache) ApplyEntryFn(f func(key string, entry *entry) error) error {
|
||||
c.mu.RLock()
|
||||
e, ok := c.store[key]
|
||||
store := c.store
|
||||
c.mu.RUnlock()
|
||||
|
||||
if ok {
|
||||
return e
|
||||
}
|
||||
|
||||
// high-contention path: entry doesn't exist (probably), create a new
|
||||
// one after checking again:
|
||||
c.mu.Lock()
|
||||
|
||||
e, ok = c.store[key]
|
||||
if !ok {
|
||||
e = newEntry()
|
||||
c.store[key] = e
|
||||
}
|
||||
|
||||
c.mu.Unlock()
|
||||
|
||||
return e
|
||||
return store.apply(f)
|
||||
}
|
||||
|
||||
// CacheLoader processes a set of WAL segment files, and loads a cache with the data
|
||||
|
|
|
@ -85,7 +85,7 @@ func TestCache_CacheWriteMulti(t *testing.T) {
|
|||
values := Values{v0, v1, v2}
|
||||
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())
|
||||
|
||||
c := NewCache(3*valuesSize, "")
|
||||
c := NewCache(30*valuesSize, "")
|
||||
|
||||
if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil {
|
||||
t.Fatalf("failed to write key foo to cache: %s", err.Error())
|
||||
|
@ -128,7 +128,7 @@ func TestCache_Cache_DeleteRange(t *testing.T) {
|
|||
values := Values{v0, v1, v2}
|
||||
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())
|
||||
|
||||
c := NewCache(3*valuesSize, "")
|
||||
c := NewCache(30*valuesSize, "")
|
||||
|
||||
if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil {
|
||||
t.Fatalf("failed to write key foo to cache: %s", err.Error())
|
||||
|
@ -202,7 +202,7 @@ func TestCache_Cache_Delete(t *testing.T) {
|
|||
values := Values{v0, v1, v2}
|
||||
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())
|
||||
|
||||
c := NewCache(3*valuesSize, "")
|
||||
c := NewCache(30*valuesSize, "")
|
||||
|
||||
if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil {
|
||||
t.Fatalf("failed to write key foo to cache: %s", err.Error())
|
||||
|
@ -719,7 +719,7 @@ type points struct {
|
|||
|
||||
func BenchmarkCacheParallelFloatEntries(b *testing.B) {
|
||||
c := b.N * runtime.GOMAXPROCS(0)
|
||||
cache := NewCache(uint64(c)*fvSize, "")
|
||||
cache := NewCache(uint64(c)*fvSize*10, "")
|
||||
vals := make([]points, c)
|
||||
for i := 0; i < c; i++ {
|
||||
v := make([]Value, 10)
|
||||
|
|
|
@ -456,20 +456,15 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index *tsdb.DatabaseIndex) er
|
|||
}
|
||||
|
||||
// load metadata from the Cache
|
||||
e.Cache.RLock() // shouldn't need the lock, but just to be safe
|
||||
defer e.Cache.RUnlock()
|
||||
|
||||
for key, entry := range e.Cache.Store() {
|
||||
|
||||
if err := e.Cache.ApplyEntryFn(func(key string, entry *entry) error {
|
||||
fieldType, err := entry.values.InfluxQLType()
|
||||
if err != nil {
|
||||
e.logger.Info(fmt.Sprintf("error getting the data type of values for key %s: %s", key, err.Error()))
|
||||
continue
|
||||
}
|
||||
|
||||
if err := e.addToIndexFromKey(shardID, []byte(key), fieldType, index); err != nil {
|
||||
return err
|
||||
}
|
||||
return e.addToIndexFromKey(shardID, []byte(key), fieldType, index)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.traceLogger.Info(fmt.Sprintf("Meta data index for shard %d loaded in %v", shardID, time.Since(now)))
|
||||
|
@ -716,7 +711,7 @@ func (e *Engine) ContainsSeries(keys []string) (map[string]bool, error) {
|
|||
keyMap[k] = false
|
||||
}
|
||||
|
||||
for _, k := range e.Cache.Keys() {
|
||||
for _, k := range e.Cache.unsortedKeys() {
|
||||
seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k))
|
||||
keyMap[string(seriesKey)] = true
|
||||
}
|
||||
|
@ -788,15 +783,15 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
|
|||
|
||||
// find the keys in the cache and remove them
|
||||
walKeys := deleteKeys[:0]
|
||||
e.Cache.RLock()
|
||||
s := e.Cache.Store()
|
||||
for k, _ := range s {
|
||||
|
||||
// ApplyEntryFn cannot return an error in this invocation.
|
||||
_ = e.Cache.ApplyEntryFn(func(k string, _ *entry) error {
|
||||
seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k))
|
||||
if _, ok := keyMap[string(seriesKey)]; ok {
|
||||
walKeys = append(walKeys, k)
|
||||
}
|
||||
}
|
||||
e.Cache.RUnlock()
|
||||
return nil
|
||||
})
|
||||
|
||||
e.Cache.DeleteRange(walKeys, min, max)
|
||||
|
||||
|
|
|
@ -742,9 +742,11 @@ func BenchmarkEngine_CreateIterator_Limit_1M(b *testing.B) {
|
|||
func BenchmarkEngine_WritePoints_10(b *testing.B) {
|
||||
benchmarkEngine_WritePoints(b, 10)
|
||||
}
|
||||
|
||||
func BenchmarkEngine_WritePoints_100(b *testing.B) {
|
||||
benchmarkEngine_WritePoints(b, 100)
|
||||
}
|
||||
|
||||
func BenchmarkEngine_WritePoints_1000(b *testing.B) {
|
||||
benchmarkEngine_WritePoints(b, 1000)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,218 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/cespare/xxhash"
|
||||
)
|
||||
|
||||
// partitions is the number of partitions we used in the ring's continuum. It
|
||||
// basically defines the maximum number of partitions you can have in the ring.
|
||||
// If a smaller number of partitions are chosen when creating a ring, then
|
||||
// they're evenly spread across this many partitions in the ring.
|
||||
const partitions = 256
|
||||
|
||||
// ring is a structure that maps series keys to entries.
|
||||
//
|
||||
// ring is implemented as a crude hash ring, in so much that you can have
|
||||
// variable numbers of members in the ring, and the appropriate member for a
|
||||
// given series key can always consistently be found. Unlike a true hash ring
|
||||
// though, this ring is not resizeable—there must be at most 256 members in the
|
||||
// ring, and the number of members must always be a power of 2.
|
||||
//
|
||||
// ring works as follows: Each member of the ring contains a single store, which
|
||||
// contains a map of series keys to entries. A ring always has 256 partitions,
|
||||
// and a member takes up one or more of these partitions (depending on how many
|
||||
// members are specified to be in the ring)
|
||||
//
|
||||
// To determine the partition that a series key should be added to, the series
|
||||
// key is hashed and the first 8 bits are used as an index to the ring.
|
||||
//
|
||||
type ring struct {
|
||||
partitions []*partition // The unique set of partitions in the ring.
|
||||
continuum []*partition // A mapping of parition to location on the ring continuum.
|
||||
|
||||
// Number of entries held within the ring. This is used to provide a
|
||||
// hint for allocating a []string to return all keys. It will not be
|
||||
// perfectly accurate since it doesn't consider adding duplicate keys,
|
||||
// or trying to remove non-existent keys.
|
||||
entryN int64
|
||||
}
|
||||
|
||||
func newring(n int) (*ring, error) {
|
||||
if n <= 0 || n > partitions {
|
||||
return nil, fmt.Errorf("invalid number of paritions: %d", n)
|
||||
}
|
||||
|
||||
r := ring{
|
||||
continuum: make([]*partition, partitions), // maximum number of partitions.
|
||||
}
|
||||
|
||||
// The trick here is to map N partitions to all points on the continuum,
|
||||
// such that the first eight bits of a given hash will map directly to one
|
||||
// of the N partitions.
|
||||
for i := 0; i < len(r.continuum); i++ {
|
||||
if (i == 0 || i%(partitions/n) == 0) && len(r.partitions) < n {
|
||||
r.partitions = append(r.partitions, &partition{
|
||||
store: make(map[string]*entry),
|
||||
entrySizeHints: make(map[uint64]int),
|
||||
})
|
||||
}
|
||||
r.continuum[i] = r.partitions[len(r.partitions)-1]
|
||||
}
|
||||
return &r, nil
|
||||
}
|
||||
|
||||
// getPartition retrieves the hash ring partition associated with the provided
|
||||
// key.
|
||||
func (r *ring) getPartition(key string) *partition {
|
||||
return r.continuum[int(uint8(xxhash.Sum64([]byte(key))))]
|
||||
}
|
||||
|
||||
// entry returns the entry for the given key.
|
||||
// entry is safe for use by multiple goroutines.
|
||||
func (r *ring) entry(key string) (*entry, bool) {
|
||||
return r.getPartition(key).entry(key)
|
||||
}
|
||||
|
||||
// write writes values to the entry in the ring's partition associated with key.
|
||||
// If no entry exists for the key then one will be created.
|
||||
// write is safe for use by multiple goroutines.
|
||||
func (r *ring) write(key string, values Values) error {
|
||||
return r.getPartition(key).write(key, values)
|
||||
}
|
||||
|
||||
// add adds an entry to the ring.
|
||||
func (r *ring) add(key string, entry *entry) {
|
||||
r.getPartition(key).add(key, entry)
|
||||
atomic.AddInt64(&r.entryN, 1)
|
||||
}
|
||||
|
||||
// remove deletes the entry for the given key.
|
||||
// remove is safe for use by multiple goroutines.
|
||||
func (r *ring) remove(key string) {
|
||||
r.getPartition(key).remove(key)
|
||||
if r.entryN > 0 {
|
||||
atomic.AddInt64(&r.entryN, -1)
|
||||
}
|
||||
}
|
||||
|
||||
// keys returns all the keys from all partitions in the hash ring. The returned
|
||||
// keys will be in order if sorted is true.
|
||||
func (r *ring) keys(sorted bool) []string {
|
||||
keys := make([]string, 0, atomic.LoadInt64(&r.entryN))
|
||||
for _, p := range r.partitions {
|
||||
keys = append(keys, p.keys()...)
|
||||
}
|
||||
|
||||
if sorted {
|
||||
sort.Strings(keys)
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
// apply applies the provided function to every entry in the ring under a read
|
||||
// lock. The provided function will be called with each key and the
|
||||
// corresponding entry. The first error encountered will be returned, if any.
|
||||
func (r *ring) apply(f func(string, *entry) error) error {
|
||||
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
res = make(chan error, len(r.partitions))
|
||||
)
|
||||
|
||||
for _, p := range r.partitions {
|
||||
wg.Add(1)
|
||||
|
||||
go func(p *partition) {
|
||||
defer wg.Done()
|
||||
|
||||
p.mu.RLock()
|
||||
for k, e := range p.store {
|
||||
if err := f(k, e); err != nil {
|
||||
res <- err
|
||||
p.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
p.mu.RUnlock()
|
||||
}(p)
|
||||
}
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(res)
|
||||
}()
|
||||
|
||||
// Collect results.
|
||||
for err := range res {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type partition struct {
|
||||
mu sync.RWMutex
|
||||
store map[string]*entry
|
||||
}
|
||||
|
||||
// entry returns the partition's entry for the provided key.
|
||||
// It's safe for use by multiple goroutines.
|
||||
func (p *partition) entry(key string) (*entry, bool) {
|
||||
p.mu.RLock()
|
||||
e, ok := p.store[key]
|
||||
p.mu.RUnlock()
|
||||
return e, ok
|
||||
}
|
||||
|
||||
// write writes the values to the entry in the partition, creating the entry
|
||||
// if it does not exist.
|
||||
// write is safe for use by multiple goroutines.
|
||||
func (p *partition) write(key string, values Values) error {
|
||||
p.mu.RLock()
|
||||
e, ok := p.store[key]
|
||||
p.mu.RUnlock()
|
||||
if ok {
|
||||
return e.add(values)
|
||||
}
|
||||
|
||||
e, err := newEntryValues(values)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
p.store[key] = e
|
||||
p.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *partition) add(key string, entry *entry) {
|
||||
p.mu.Lock()
|
||||
p.store[key] = entry
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
||||
// remove deletes the entry associated with the provided key.
|
||||
// remove is safe for use by multiple goroutines.
|
||||
func (p *partition) remove(key string) {
|
||||
p.mu.Lock()
|
||||
delete(p.store, key)
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
||||
// keys returns an unsorted slice of the keys in the partition.
|
||||
func (p *partition) keys() []string {
|
||||
p.mu.RLock()
|
||||
keys := make([]string, 0, len(p.store))
|
||||
for k, _ := range p.store {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
p.mu.RUnlock()
|
||||
return keys
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var strSliceRes []string
|
||||
|
||||
func benchmarkRingkeys(b *testing.B, r *ring, keys int) {
|
||||
// Add some keys
|
||||
for i := 0; i < keys; i++ {
|
||||
r.add(fmt.Sprintf("cpu,host=server-%d value=1", i), nil)
|
||||
}
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
strSliceRes = r.keys(false)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkRing_keys_100(b *testing.B) { benchmarkRingkeys(b, MustNewRing(256), 100) }
|
||||
func BenchmarkRing_keys_1000(b *testing.B) { benchmarkRingkeys(b, MustNewRing(256), 1000) }
|
||||
func BenchmarkRing_keys_10000(b *testing.B) { benchmarkRingkeys(b, MustNewRing(256), 10000) }
|
||||
func BenchmarkRing_keys_100000(b *testing.B) { benchmarkRingkeys(b, MustNewRing(256), 100000) }
|
||||
|
||||
func MustNewRing(n int) *ring {
|
||||
r, err := newring(n)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return r
|
||||
}
|
Loading…
Reference in New Issue