commit
a7c3f20cda
|
@ -16,154 +16,32 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// ringShards specifies the number of partitions that the hash ring used to
|
||||
// store the entry mappings contains. It must be a power of 2. From empirical
|
||||
// testing, a value above the number of cores on the machine does not provide
|
||||
// any additional benefit. For now we'll set it to the number of cores on the
|
||||
// largest box we could imagine running influx.
|
||||
const ringShards = 16
|
||||
|
||||
var (
|
||||
// ErrSnapshotInProgress is returned if a snapshot is attempted while one is already running.
|
||||
ErrSnapshotInProgress = fmt.Errorf("snapshot in progress")
|
||||
)
|
||||
|
||||
// CacheMemorySizeLimitExceededError is the type of error returned from the cache when
|
||||
// a write would place it over its size limit.
|
||||
type CacheMemorySizeLimitExceededError struct {
|
||||
Size uint64
|
||||
Limit uint64
|
||||
}
|
||||
|
||||
func (c CacheMemorySizeLimitExceededError) Error() string {
|
||||
return fmt.Sprintf("cache-max-memory-size exceeded: (%d/%d)", c.Size, c.Limit)
|
||||
}
|
||||
|
||||
// ErrCacheMemorySizeLimitExceeded returns an error indicating an operation
|
||||
// could not be completed due to exceeding the cache-max-memory-size setting.
|
||||
func ErrCacheMemorySizeLimitExceeded(n, limit uint64) error {
|
||||
return fmt.Errorf("cache-max-memory-size exceeded: (%d/%d)", n, limit)
|
||||
}
|
||||
|
||||
// entry is a set of values and some metadata.
|
||||
type entry struct {
|
||||
mu sync.RWMutex
|
||||
values Values // All stored values.
|
||||
|
||||
// The type of values stored. Read only so doesn't need to be protected by
|
||||
// mu.
|
||||
vtype byte
|
||||
}
|
||||
|
||||
// newEntryValues returns a new instance of entry with the given values. If the
|
||||
// values are not valid, an error is returned.
|
||||
func newEntryValues(values []Value) (*entry, error) {
|
||||
e := &entry{}
|
||||
e.values = make(Values, 0, len(values))
|
||||
e.values = append(e.values, values...)
|
||||
|
||||
// No values, don't check types and ordering
|
||||
if len(values) == 0 {
|
||||
return e, nil
|
||||
}
|
||||
|
||||
et := valueType(values[0])
|
||||
for _, v := range values {
|
||||
// Make sure all the values are the same type
|
||||
if et != valueType(v) {
|
||||
return nil, tsdb.ErrFieldTypeConflict
|
||||
}
|
||||
}
|
||||
|
||||
// Set the type of values stored.
|
||||
e.vtype = et
|
||||
|
||||
return e, nil
|
||||
}
|
||||
|
||||
// add adds the given values to the entry.
|
||||
func (e *entry) add(values []Value) error {
|
||||
if len(values) == 0 {
|
||||
return nil // Nothing to do.
|
||||
}
|
||||
|
||||
// Are any of the new values the wrong type?
|
||||
if e.vtype != 0 {
|
||||
for _, v := range values {
|
||||
if e.vtype != valueType(v) {
|
||||
return tsdb.ErrFieldTypeConflict
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// entry currently has no values, so add the new ones and we're done.
|
||||
e.mu.Lock()
|
||||
if len(e.values) == 0 {
|
||||
e.values = values
|
||||
e.vtype = valueType(values[0])
|
||||
e.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Append the new values to the existing ones...
|
||||
e.values = append(e.values, values...)
|
||||
e.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// deduplicate sorts and orders the entry's values. If values are already deduped and sorted,
|
||||
// the function does no work and simply returns.
|
||||
func (e *entry) deduplicate() {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
if len(e.values) <= 1 {
|
||||
return
|
||||
}
|
||||
e.values = e.values.Deduplicate()
|
||||
}
|
||||
|
||||
// count returns the number of values in this entry.
|
||||
func (e *entry) count() int {
|
||||
e.mu.RLock()
|
||||
n := len(e.values)
|
||||
e.mu.RUnlock()
|
||||
return n
|
||||
}
|
||||
|
||||
// filter removes all values with timestamps between min and max inclusive.
|
||||
func (e *entry) filter(min, max int64) {
|
||||
e.mu.Lock()
|
||||
if len(e.values) > 1 {
|
||||
e.values = e.values.Deduplicate()
|
||||
}
|
||||
e.values = e.values.Exclude(min, max)
|
||||
e.mu.Unlock()
|
||||
}
|
||||
|
||||
// size returns the size of this entry in bytes.
|
||||
func (e *entry) size() int {
|
||||
e.mu.RLock()
|
||||
sz := e.values.Size()
|
||||
e.mu.RUnlock()
|
||||
return sz
|
||||
}
|
||||
|
||||
// InfluxQLType returns for the entry the data type of its values.
|
||||
func (e *entry) InfluxQLType() (influxql.DataType, error) {
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
return e.values.InfluxQLType()
|
||||
}
|
||||
|
||||
// storer is the interface that descibes a cache's store.
|
||||
type storer interface {
|
||||
entry(key []byte) *entry // Get an entry by its key.
|
||||
write(key []byte, values Values) (bool, error) // Write an entry to the store.
|
||||
add(key []byte, entry *entry) // Add a new entry to the store.
|
||||
remove(key []byte) // Remove an entry from the store.
|
||||
keys(sorted bool) [][]byte // Return an optionally sorted slice of entry keys.
|
||||
apply(f func([]byte, *entry) error) error // Apply f to all entries in the store in parallel.
|
||||
applySerial(f func([]byte, *entry) error) error // Apply f to all entries in serial.
|
||||
reset() // Reset the store to an initial unused state.
|
||||
split(n int) []storer // Split splits the store into n stores
|
||||
count() int // Count returns the number of keys in the store
|
||||
return CacheMemorySizeLimitExceededError{Size: n, Limit: limit}
|
||||
}
|
||||
|
||||
// Cache maintains an in-memory store of Values for a set of keys.
|
||||
type Cache struct {
|
||||
_ uint64 // Padding for 32 bit struct alignment
|
||||
mu sync.RWMutex
|
||||
store storer
|
||||
store *ring
|
||||
maxSize uint64
|
||||
|
||||
// snapshots are the cache objects that are currently being written to tsm files
|
||||
|
@ -175,53 +53,22 @@ type Cache struct {
|
|||
tracker *cacheTracker
|
||||
lastSnapshot time.Time
|
||||
lastWriteTime time.Time
|
||||
|
||||
// A one time synchronization used to initial the cache with a store. Since the store can allocate a
|
||||
// a large amount memory across shards, we lazily create it.
|
||||
initialize atomic.Value
|
||||
initializedCount uint32
|
||||
}
|
||||
|
||||
// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
|
||||
// Only used for engine caches, never for snapshots.
|
||||
func NewCache(maxSize uint64) *Cache {
|
||||
c := &Cache{
|
||||
return &Cache{
|
||||
maxSize: maxSize,
|
||||
store: emptyStore{},
|
||||
store: newRing(),
|
||||
lastSnapshot: time.Now(),
|
||||
tracker: newCacheTracker(newCacheMetrics(nil), nil),
|
||||
}
|
||||
c.initialize.Store(&sync.Once{})
|
||||
return c
|
||||
}
|
||||
|
||||
// init initializes the cache and allocates the underlying store. Once initialized,
|
||||
// the store re-used until Freed.
|
||||
func (c *Cache) init() {
|
||||
if !atomic.CompareAndSwapUint32(&c.initializedCount, 0, 1) {
|
||||
return
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.store, _ = newring(ringShards)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// Free releases the underlying store and memory held by the Cache.
|
||||
func (c *Cache) Free() {
|
||||
if !atomic.CompareAndSwapUint32(&c.initializedCount, 1, 0) {
|
||||
return
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.store = emptyStore{}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// Write writes the set of values for the key to the cache. This function is goroutine-safe.
|
||||
// It returns an error if the cache will exceed its max size by adding the new values.
|
||||
func (c *Cache) Write(key []byte, values []Value) error {
|
||||
c.init()
|
||||
addedSize := uint64(Values(values).Size())
|
||||
|
||||
// Enough room in the cache?
|
||||
|
@ -259,7 +106,6 @@ func (c *Cache) Write(key []byte, values []Value) error {
|
|||
// values as possible. If one key fails, the others can still succeed and an
|
||||
// error will be returned.
|
||||
func (c *Cache) WriteMulti(values map[string][]Value) error {
|
||||
c.init()
|
||||
var addedSize uint64
|
||||
for _, v := range values {
|
||||
addedSize += uint64(Values(v).Size())
|
||||
|
@ -320,8 +166,6 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
|
|||
// Snapshot takes a snapshot of the current cache, adds it to the slice of caches that
|
||||
// are being flushed, and resets the current cache with new values.
|
||||
func (c *Cache) Snapshot() (*Cache, error) {
|
||||
c.init()
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
|
@ -334,13 +178,8 @@ func (c *Cache) Snapshot() (*Cache, error) {
|
|||
|
||||
// If no snapshot exists, create a new one, otherwise update the existing snapshot
|
||||
if c.snapshot == nil {
|
||||
store, err := newring(ringShards)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.snapshot = &Cache{
|
||||
store: store,
|
||||
store: newRing(),
|
||||
tracker: newCacheTracker(c.tracker.metrics, c.tracker.labels),
|
||||
}
|
||||
}
|
||||
|
@ -384,8 +223,6 @@ func (c *Cache) Deduplicate() {
|
|||
// ClearSnapshot removes the snapshot cache from the list of flushing caches and
|
||||
// adjusts the size.
|
||||
func (c *Cache) ClearSnapshot(success bool) {
|
||||
c.init()
|
||||
|
||||
c.mu.RLock()
|
||||
snapStore := c.snapshot.store
|
||||
c.mu.RUnlock()
|
||||
|
@ -550,8 +387,6 @@ func (c *Cache) Values(key []byte) Values {
|
|||
// with timestamps between min and max contained in the bucket identified
|
||||
// by name from the cache.
|
||||
func (c *Cache) DeleteBucketRange(name []byte, min, max int64) {
|
||||
c.init()
|
||||
|
||||
// TODO(edd/jeff): find a way to optimize lock usage
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
@ -859,16 +694,3 @@ func valueType(v Value) byte {
|
|||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
type emptyStore struct{}
|
||||
|
||||
func (e emptyStore) entry(key []byte) *entry { return nil }
|
||||
func (e emptyStore) write(key []byte, values Values) (bool, error) { return false, nil }
|
||||
func (e emptyStore) add(key []byte, entry *entry) {}
|
||||
func (e emptyStore) remove(key []byte) {}
|
||||
func (e emptyStore) keys(sorted bool) [][]byte { return nil }
|
||||
func (e emptyStore) apply(f func([]byte, *entry) error) error { return nil }
|
||||
func (e emptyStore) applySerial(f func([]byte, *entry) error) error { return nil }
|
||||
func (e emptyStore) reset() {}
|
||||
func (e emptyStore) split(n int) []storer { return nil }
|
||||
func (e emptyStore) count() int { return 0 }
|
||||
|
|
|
@ -0,0 +1,118 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"github.com/influxdata/influxql"
|
||||
)
|
||||
|
||||
// entry is a set of values and some metadata.
|
||||
type entry struct {
|
||||
mu sync.RWMutex
|
||||
values Values // All stored values.
|
||||
|
||||
// The type of values stored. Read only so doesn't need to be protected by mu.
|
||||
vtype byte
|
||||
}
|
||||
|
||||
// newEntryValues returns a new instance of entry with the given values. If the
|
||||
// values are not valid, an error is returned.
|
||||
func newEntryValues(values []Value) (*entry, error) {
|
||||
e := &entry{}
|
||||
e.values = make(Values, 0, len(values))
|
||||
e.values = append(e.values, values...)
|
||||
|
||||
// No values, don't check types and ordering
|
||||
if len(values) == 0 {
|
||||
return e, nil
|
||||
}
|
||||
|
||||
et := valueType(values[0])
|
||||
for _, v := range values {
|
||||
// Make sure all the values are the same type
|
||||
if et != valueType(v) {
|
||||
return nil, tsdb.ErrFieldTypeConflict
|
||||
}
|
||||
}
|
||||
|
||||
// Set the type of values stored.
|
||||
e.vtype = et
|
||||
|
||||
return e, nil
|
||||
}
|
||||
|
||||
// add adds the given values to the entry.
|
||||
func (e *entry) add(values []Value) error {
|
||||
if len(values) == 0 {
|
||||
return nil // Nothing to do.
|
||||
}
|
||||
|
||||
// Are any of the new values the wrong type?
|
||||
if e.vtype != 0 {
|
||||
for _, v := range values {
|
||||
if e.vtype != valueType(v) {
|
||||
return tsdb.ErrFieldTypeConflict
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// entry currently has no values, so add the new ones and we're done.
|
||||
e.mu.Lock()
|
||||
if len(e.values) == 0 {
|
||||
e.values = values
|
||||
e.vtype = valueType(values[0])
|
||||
e.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Append the new values to the existing ones...
|
||||
e.values = append(e.values, values...)
|
||||
e.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// deduplicate sorts and orders the entry's values. If values are already deduped and sorted,
|
||||
// the function does no work and simply returns.
|
||||
func (e *entry) deduplicate() {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
if len(e.values) <= 1 {
|
||||
return
|
||||
}
|
||||
e.values = e.values.Deduplicate()
|
||||
}
|
||||
|
||||
// count returns the number of values in this entry.
|
||||
func (e *entry) count() int {
|
||||
e.mu.RLock()
|
||||
n := len(e.values)
|
||||
e.mu.RUnlock()
|
||||
return n
|
||||
}
|
||||
|
||||
// filter removes all values with timestamps between min and max inclusive.
|
||||
func (e *entry) filter(min, max int64) {
|
||||
e.mu.Lock()
|
||||
if len(e.values) > 1 {
|
||||
e.values = e.values.Deduplicate()
|
||||
}
|
||||
e.values = e.values.Exclude(min, max)
|
||||
e.mu.Unlock()
|
||||
}
|
||||
|
||||
// size returns the size of this entry in bytes.
|
||||
func (e *entry) size() int {
|
||||
e.mu.RLock()
|
||||
sz := e.values.Size()
|
||||
e.mu.RUnlock()
|
||||
return sz
|
||||
}
|
||||
|
||||
// InfluxQLType returns for the entry the data type of its values.
|
||||
func (e *entry) InfluxQLType() (influxql.DataType, error) {
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
return e.values.InfluxQLType()
|
||||
}
|
|
@ -1,8 +1,6 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
|
@ -105,44 +103,32 @@ func TestCache_CacheWriteMulti(t *testing.T) {
|
|||
|
||||
// Tests that the cache stats and size are correctly maintained during writes.
|
||||
func TestCache_WriteMulti_Stats(t *testing.T) {
|
||||
limit := uint64(1)
|
||||
c := NewCache(limit)
|
||||
ms := NewTestStore()
|
||||
c.store = ms
|
||||
|
||||
// Not enough room in the cache.
|
||||
v := NewValue(1, 1.0)
|
||||
values := map[string][]Value{"foo": {v, v}}
|
||||
if got, exp := c.WriteMulti(values), ErrCacheMemorySizeLimitExceeded(uint64(v.Size()*2), limit); !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("got %q, expected %q", got, exp)
|
||||
}
|
||||
vf := NewValue(1, 1.0)
|
||||
vi := NewValue(1, int64(1))
|
||||
c := NewCache(60)
|
||||
|
||||
// Fail one of the values in the write.
|
||||
c = NewCache(50)
|
||||
c.init()
|
||||
c.store = ms
|
||||
|
||||
ms.writef = func(key []byte, v Values) (bool, error) {
|
||||
if bytes.Equal(key, []byte("foo")) {
|
||||
return false, errors.New("write failed")
|
||||
}
|
||||
return true, nil
|
||||
if err := c.WriteMulti(map[string][]Value{"foo": {vf}}); err != nil {
|
||||
t.Fatalf("expected no error. got %v", err)
|
||||
}
|
||||
if err := c.WriteMulti(map[string][]Value{"foo": {vi}, "bar": {vf}}); err == nil {
|
||||
t.Fatal("got no error")
|
||||
}
|
||||
|
||||
values = map[string][]Value{"foo": {v, v}, "bar": {v}}
|
||||
if got, exp := c.WriteMulti(values), errors.New("write failed"); !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("got %v, expected %v", got, exp)
|
||||
// Not enough room in the cache.
|
||||
if err := c.WriteMulti(map[string][]Value{"foo": {vf, vf}}); err == nil {
|
||||
t.Fatal("got no error")
|
||||
}
|
||||
|
||||
// Cache size decreased correctly.
|
||||
if got, exp := c.Size(), uint64(16)+3; got != exp {
|
||||
if got, exp := c.Size(), uint64(3+3*8+3+8); got != exp {
|
||||
t.Fatalf("got %v, expected %v", got, exp)
|
||||
}
|
||||
|
||||
// Write stats updated
|
||||
if got, exp := atomic.LoadUint64(&c.tracker.writesDropped), uint64(1); got != exp {
|
||||
t.Fatalf("got %v, expected %v", got, exp)
|
||||
} else if got, exp := atomic.LoadUint64(&c.tracker.writesErr), uint64(1); got != exp {
|
||||
} else if got, exp := atomic.LoadUint64(&c.tracker.writesErr), uint64(2); got != exp {
|
||||
t.Fatalf("got %v, expected %v", got, exp)
|
||||
}
|
||||
}
|
||||
|
@ -764,37 +750,10 @@ func mustMarshalEntry(entry wal.WALEntry) (wal.WalEntryType, []byte) {
|
|||
return entry.Type(), snappy.Encode(b, b)
|
||||
}
|
||||
|
||||
// TestStore implements the storer interface and can be used to mock out a
|
||||
// Cache's storer implememation.
|
||||
type TestStore struct {
|
||||
entryf func(key []byte) *entry
|
||||
writef func(key []byte, values Values) (bool, error)
|
||||
addf func(key []byte, entry *entry)
|
||||
removef func(key []byte)
|
||||
keysf func(sorted bool) [][]byte
|
||||
applyf func(f func([]byte, *entry) error) error
|
||||
applySerialf func(f func([]byte, *entry) error) error
|
||||
resetf func()
|
||||
splitf func(n int) []storer
|
||||
countf func() int
|
||||
}
|
||||
|
||||
func NewTestStore() *TestStore { return &TestStore{} }
|
||||
func (s *TestStore) entry(key []byte) *entry { return s.entryf(key) }
|
||||
func (s *TestStore) write(key []byte, values Values) (bool, error) { return s.writef(key, values) }
|
||||
func (s *TestStore) add(key []byte, entry *entry) { s.addf(key, entry) }
|
||||
func (s *TestStore) remove(key []byte) { s.removef(key) }
|
||||
func (s *TestStore) keys(sorted bool) [][]byte { return s.keysf(sorted) }
|
||||
func (s *TestStore) apply(f func([]byte, *entry) error) error { return s.applyf(f) }
|
||||
func (s *TestStore) applySerial(f func([]byte, *entry) error) error { return s.applySerialf(f) }
|
||||
func (s *TestStore) reset() { s.resetf() }
|
||||
func (s *TestStore) split(n int) []storer { return s.splitf(n) }
|
||||
func (s *TestStore) count() int { return s.countf() }
|
||||
|
||||
var fvSize = uint64(NewValue(1, float64(1)).Size())
|
||||
|
||||
func BenchmarkCacheFloatEntries(b *testing.B) {
|
||||
cache := NewCache(uint64(b.N) * fvSize)
|
||||
cache := NewCache(uint64(b.N)*fvSize + 4)
|
||||
vals := make([][]Value, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
vals[i] = []Value{NewValue(1, float64(i))}
|
||||
|
@ -815,7 +774,7 @@ type points struct {
|
|||
|
||||
func BenchmarkCacheParallelFloatEntries(b *testing.B) {
|
||||
c := b.N * runtime.GOMAXPROCS(0)
|
||||
cache := NewCache(uint64(c) * fvSize * 10)
|
||||
cache := NewCache(uint64(c)*fvSize*10 + 20*5)
|
||||
vals := make([]points, c)
|
||||
for i := 0; i < c; i++ {
|
||||
v := make([]Value, 10)
|
||||
|
|
|
@ -419,11 +419,6 @@ func (e *Engine) disableSnapshotCompactions() {
|
|||
e.mu.Lock()
|
||||
e.snapDone = nil
|
||||
e.mu.Unlock()
|
||||
|
||||
// If the cache is empty, free up its resources as well.
|
||||
if e.Cache.Size() == 0 {
|
||||
e.Cache.Free()
|
||||
}
|
||||
}
|
||||
|
||||
// ScheduleFullCompaction will force the engine to fully compact all data stored.
|
||||
|
@ -587,12 +582,6 @@ func (e *Engine) IsIdle() bool {
|
|||
return cacheEmpty && e.compactionTracker.AllActive() == 0 && e.CompactionPlan.FullyCompacted()
|
||||
}
|
||||
|
||||
// Free releases any resources held by the engine to free up memory or CPU.
|
||||
func (e *Engine) Free() error {
|
||||
e.Cache.Free()
|
||||
return e.FileStore.Free()
|
||||
}
|
||||
|
||||
// WritePoints saves the set of points in the engine.
|
||||
func (e *Engine) WritePoints(points []models.Point) error {
|
||||
collection := tsdb.NewSeriesCollection(points)
|
||||
|
|
|
@ -345,19 +345,6 @@ func (f *FileStore) Files() []TSMFile {
|
|||
return f.files
|
||||
}
|
||||
|
||||
// Free releases any resources held by the FileStore. The resources will be re-acquired
|
||||
// if necessary if they are needed after freeing them.
|
||||
func (f *FileStore) Free() error {
|
||||
f.mu.RLock()
|
||||
defer f.mu.RUnlock()
|
||||
for _, f := range f.files {
|
||||
if err := f.Free(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CurrentGeneration returns the current generation of the TSM files.
|
||||
func (f *FileStore) CurrentGeneration() int {
|
||||
f.mu.RLock()
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
|
@ -9,11 +8,11 @@ import (
|
|||
"github.com/influxdata/influxdb/pkg/bytesutil"
|
||||
)
|
||||
|
||||
// partitions is the number of partitions we used in the ring's continuum. It
|
||||
// numPartitions is the number of partitions we used in the ring's continuum. It
|
||||
// basically defines the maximum number of partitions you can have in the ring.
|
||||
// If a smaller number of partitions are chosen when creating a ring, then
|
||||
// they're evenly spread across this many partitions in the ring.
|
||||
const partitions = 16
|
||||
const numPartitions = 16
|
||||
|
||||
// ring is a structure that maps series keys to entries.
|
||||
//
|
||||
|
@ -24,12 +23,12 @@ const partitions = 16
|
|||
// ring, and the number of members must always be a power of 2.
|
||||
//
|
||||
// ring works as follows: Each member of the ring contains a single store, which
|
||||
// contains a map of series keys to entries. A ring always has 256 partitions,
|
||||
// contains a map of series keys to entries. A ring always has 16 partitions,
|
||||
// and a member takes up one or more of these partitions (depending on how many
|
||||
// members are specified to be in the ring)
|
||||
//
|
||||
// To determine the partition that a series key should be added to, the series
|
||||
// key is hashed and the first 8 bits are used as an index to the ring.
|
||||
// key is hashed and the least significant 4 bits are used as an index to the ring.
|
||||
//
|
||||
type ring struct {
|
||||
// Number of keys within the ring. This is used to provide a hint for
|
||||
|
@ -40,33 +39,16 @@ type ring struct {
|
|||
|
||||
// The unique set of partitions in the ring.
|
||||
// len(partitions) <= len(continuum)
|
||||
partitions []*partition
|
||||
partitions [numPartitions]*partition
|
||||
}
|
||||
|
||||
// newring returns a new ring initialised with n partitions. n must always be a
|
||||
// power of 2, and for performance reasons should be larger than the number of
|
||||
// cores on the host. The supported set of values for n is:
|
||||
//
|
||||
// {1, 2, 4, 8, 16, 32, 64, 128, 256}.
|
||||
//
|
||||
func newring(n int) (*ring, error) {
|
||||
if n <= 0 || n > partitions {
|
||||
return nil, fmt.Errorf("invalid number of paritions: %d", n)
|
||||
}
|
||||
|
||||
r := ring{
|
||||
partitions: make([]*partition, n), // maximum number of partitions.
|
||||
}
|
||||
|
||||
// The trick here is to map N partitions to all points on the continuum,
|
||||
// such that the first eight bits of a given hash will map directly to one
|
||||
// of the N partitions.
|
||||
// newring returns a new ring initialised with numPartitions partitions.
|
||||
func newRing() *ring {
|
||||
r := new(ring)
|
||||
for i := 0; i < len(r.partitions); i++ {
|
||||
r.partitions[i] = &partition{
|
||||
store: make(map[string]*entry),
|
||||
}
|
||||
r.partitions[i] = &partition{store: make(map[string]*entry)}
|
||||
}
|
||||
return &r, nil
|
||||
return r
|
||||
}
|
||||
|
||||
// reset resets the ring so it can be reused. Before removing references to entries
|
||||
|
@ -81,10 +63,9 @@ func (r *ring) reset() {
|
|||
r.keysHint = 0
|
||||
}
|
||||
|
||||
// getPartition retrieves the hash ring partition associated with the provided
|
||||
// key.
|
||||
// getPartition retrieves the hash ring partition associated with the provided key.
|
||||
func (r *ring) getPartition(key []byte) *partition {
|
||||
return r.partitions[int(xxhash.Sum64(key)%partitions)]
|
||||
return r.partitions[int(xxhash.Sum64(key)%numPartitions)]
|
||||
}
|
||||
|
||||
// entry returns the entry for the given key.
|
||||
|
@ -201,16 +182,15 @@ func (r *ring) applySerial(f func([]byte, *entry) error) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (r *ring) split(n int) []storer {
|
||||
func (r *ring) split(n int) []*ring {
|
||||
var keys int
|
||||
storers := make([]storer, n)
|
||||
storers := make([]*ring, n)
|
||||
for i := 0; i < n; i++ {
|
||||
storers[i], _ = newring(len(r.partitions))
|
||||
storers[i] = newRing()
|
||||
}
|
||||
|
||||
for i, p := range r.partitions {
|
||||
r := storers[i%n].(*ring)
|
||||
r.partitions[i] = p
|
||||
storers[i%n].partitions[i] = p
|
||||
keys += len(p.store)
|
||||
}
|
||||
return storers
|
||||
|
|
|
@ -7,48 +7,12 @@ import (
|
|||
"testing"
|
||||
)
|
||||
|
||||
func TestRing_newRing(t *testing.T) {
|
||||
examples := []struct {
|
||||
n int
|
||||
returnErr bool
|
||||
}{
|
||||
{n: 1}, {n: 2}, {n: 4}, {n: 8}, {n: 16}, {n: 32, returnErr: true},
|
||||
{n: 0, returnErr: true}, {n: 3, returnErr: true},
|
||||
}
|
||||
|
||||
for i, example := range examples {
|
||||
r, err := newring(example.n)
|
||||
if err != nil {
|
||||
if example.returnErr {
|
||||
continue // expecting an error.
|
||||
}
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if got, exp := len(r.partitions), example.n; got != exp {
|
||||
t.Fatalf("[Example %d] got %v, expected %v", i, got, exp)
|
||||
}
|
||||
|
||||
// Check partitions distributed correctly
|
||||
partitions := make([]*partition, 0)
|
||||
for i, partition := range r.partitions {
|
||||
if i == 0 || partition != partitions[len(partitions)-1] {
|
||||
partitions = append(partitions, partition)
|
||||
}
|
||||
}
|
||||
|
||||
if got, exp := len(partitions), example.n; got != exp {
|
||||
t.Fatalf("[Example %d] got %v, expected %v", i, got, exp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var strSliceRes [][]byte
|
||||
|
||||
func benchmarkRingkeys(b *testing.B, r *ring, keys int) {
|
||||
// Add some keys
|
||||
for i := 0; i < keys; i++ {
|
||||
r.add([]byte(fmt.Sprintf("cpu,host=server-%d value=1", i)), nil)
|
||||
r.add([]byte(fmt.Sprintf("cpu,host=server-%d value=1", i)), new(entry))
|
||||
}
|
||||
|
||||
b.ReportAllocs()
|
||||
|
@ -58,10 +22,10 @@ func benchmarkRingkeys(b *testing.B, r *ring, keys int) {
|
|||
}
|
||||
}
|
||||
|
||||
func BenchmarkRing_keys_100(b *testing.B) { benchmarkRingkeys(b, MustNewRing(256), 100) }
|
||||
func BenchmarkRing_keys_1000(b *testing.B) { benchmarkRingkeys(b, MustNewRing(256), 1000) }
|
||||
func BenchmarkRing_keys_10000(b *testing.B) { benchmarkRingkeys(b, MustNewRing(256), 10000) }
|
||||
func BenchmarkRing_keys_100000(b *testing.B) { benchmarkRingkeys(b, MustNewRing(256), 100000) }
|
||||
func BenchmarkRing_keys_100(b *testing.B) { benchmarkRingkeys(b, newRing(), 100) }
|
||||
func BenchmarkRing_keys_1000(b *testing.B) { benchmarkRingkeys(b, newRing(), 1000) }
|
||||
func BenchmarkRing_keys_10000(b *testing.B) { benchmarkRingkeys(b, newRing(), 10000) }
|
||||
func BenchmarkRing_keys_100000(b *testing.B) { benchmarkRingkeys(b, newRing(), 100000) }
|
||||
|
||||
func benchmarkRingGetPartition(b *testing.B, r *ring, keys int) {
|
||||
vals := make([][]byte, keys)
|
||||
|
@ -69,7 +33,7 @@ func benchmarkRingGetPartition(b *testing.B, r *ring, keys int) {
|
|||
// Add some keys
|
||||
for i := 0; i < keys; i++ {
|
||||
vals[i] = []byte(fmt.Sprintf("cpu,host=server-%d field1=value1,field2=value2,field4=value4,field5=value5,field6=value6,field7=value7,field8=value1,field9=value2,field10=value4,field11=value5,field12=value6,field13=value7", i))
|
||||
r.add(vals[i], nil)
|
||||
r.add(vals[i], new(entry))
|
||||
}
|
||||
|
||||
b.ReportAllocs()
|
||||
|
@ -79,10 +43,8 @@ func benchmarkRingGetPartition(b *testing.B, r *ring, keys int) {
|
|||
}
|
||||
}
|
||||
|
||||
func BenchmarkRing_getPartition_100(b *testing.B) { benchmarkRingGetPartition(b, MustNewRing(256), 100) }
|
||||
func BenchmarkRing_getPartition_1000(b *testing.B) {
|
||||
benchmarkRingGetPartition(b, MustNewRing(256), 1000)
|
||||
}
|
||||
func BenchmarkRing_getPartition_100(b *testing.B) { benchmarkRingGetPartition(b, newRing(), 100) }
|
||||
func BenchmarkRing_getPartition_1000(b *testing.B) { benchmarkRingGetPartition(b, newRing(), 1000) }
|
||||
|
||||
func benchmarkRingWrite(b *testing.B, r *ring, n int) {
|
||||
b.ReportAllocs()
|
||||
|
@ -114,31 +76,7 @@ func benchmarkRingWrite(b *testing.B, r *ring, n int) {
|
|||
}
|
||||
}
|
||||
|
||||
func BenchmarkRing_write_1_100(b *testing.B) { benchmarkRingWrite(b, MustNewRing(1), 100) }
|
||||
func BenchmarkRing_write_1_1000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(1), 1000) }
|
||||
func BenchmarkRing_write_1_10000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(1), 10000) }
|
||||
func BenchmarkRing_write_1_100000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(1), 100000) }
|
||||
func BenchmarkRing_write_4_100(b *testing.B) { benchmarkRingWrite(b, MustNewRing(4), 100) }
|
||||
func BenchmarkRing_write_4_1000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(4), 1000) }
|
||||
func BenchmarkRing_write_4_10000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(4), 10000) }
|
||||
func BenchmarkRing_write_4_100000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(4), 100000) }
|
||||
func BenchmarkRing_write_32_100(b *testing.B) { benchmarkRingWrite(b, MustNewRing(32), 100) }
|
||||
func BenchmarkRing_write_32_1000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(32), 1000) }
|
||||
func BenchmarkRing_write_32_10000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(32), 10000) }
|
||||
func BenchmarkRing_write_32_100000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(32), 100000) }
|
||||
func BenchmarkRing_write_128_100(b *testing.B) { benchmarkRingWrite(b, MustNewRing(128), 100) }
|
||||
func BenchmarkRing_write_128_1000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(128), 1000) }
|
||||
func BenchmarkRing_write_128_10000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(128), 10000) }
|
||||
func BenchmarkRing_write_128_100000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(256), 100000) }
|
||||
func BenchmarkRing_write_256_100(b *testing.B) { benchmarkRingWrite(b, MustNewRing(256), 100) }
|
||||
func BenchmarkRing_write_256_1000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(256), 1000) }
|
||||
func BenchmarkRing_write_256_10000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(256), 10000) }
|
||||
func BenchmarkRing_write_256_100000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(256), 100000) }
|
||||
|
||||
func MustNewRing(n int) *ring {
|
||||
r, err := newring(n)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return r
|
||||
}
|
||||
func BenchmarkRing_write_1_100(b *testing.B) { benchmarkRingWrite(b, newRing(), 100) }
|
||||
func BenchmarkRing_write_1_1000(b *testing.B) { benchmarkRingWrite(b, newRing(), 1000) }
|
||||
func BenchmarkRing_write_1_10000(b *testing.B) { benchmarkRingWrite(b, newRing(), 10000) }
|
||||
func BenchmarkRing_write_1_100000(b *testing.B) { benchmarkRingWrite(b, newRing(), 100000) }
|
||||
|
|
Loading…
Reference in New Issue