commit
1e345aa7a1
|
@ -88,7 +88,8 @@
|
||||||
# compact-full-write-cold-duration = "4h"
|
# compact-full-write-cold-duration = "4h"
|
||||||
|
|
||||||
# The maximum number of concurrent full and level compactions that can run at one time. A
|
# The maximum number of concurrent full and level compactions that can run at one time. A
|
||||||
# value of 0 results in runtime.GOMAXPROCS(0) used at runtime. This setting does not apply
|
# value of 0 results in 50% of runtime.GOMAXPROCS(0) used at runtime. Any number greater
|
||||||
|
# than 0 limits compactions to that value. This setting does not apply
|
||||||
# to cache snapshotting.
|
# to cache snapshotting.
|
||||||
# max-concurrent-compactions = 0
|
# max-concurrent-compactions = 0
|
||||||
|
|
||||||
|
@ -358,10 +359,10 @@
|
||||||
# UDP Read buffer size, 0 means OS default. UDP listener will fail if set above OS max.
|
# UDP Read buffer size, 0 means OS default. UDP listener will fail if set above OS max.
|
||||||
# read-buffer = 0
|
# read-buffer = 0
|
||||||
|
|
||||||
# Multi-value plugins can be handled two ways.
|
# Multi-value plugins can be handled two ways.
|
||||||
# "split" will parse and store the multi-value plugin data into separate measurements
|
# "split" will parse and store the multi-value plugin data into separate measurements
|
||||||
# "join" will parse and store the multi-value plugin as a single multi-value measurement.
|
# "join" will parse and store the multi-value plugin as a single multi-value measurement.
|
||||||
# "split" is the default behavior for backward compatability with previous versions of influxdb.
|
# "split" is the default behavior for backward compatability with previous versions of influxdb.
|
||||||
# parse-multivalue-plugin = "split"
|
# parse-multivalue-plugin = "split"
|
||||||
###
|
###
|
||||||
### [opentsdb]
|
### [opentsdb]
|
||||||
|
|
|
@ -10,10 +10,27 @@ func NewFixed(limit int) Fixed {
|
||||||
return make(Fixed, limit)
|
return make(Fixed, limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Idle returns true if the limiter has all its capacity is available.
|
||||||
|
func (t Fixed) Idle() bool {
|
||||||
|
return len(t) == cap(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TryTake attempts to take a token and return true if successful, otherwise returns false.
|
||||||
|
func (t Fixed) TryTake() bool {
|
||||||
|
select {
|
||||||
|
case t <- struct{}{}:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Take attempts to take a token and blocks until one is available.
|
||||||
func (t Fixed) Take() {
|
func (t Fixed) Take() {
|
||||||
t <- struct{}{}
|
t <- struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Release releases a token back to the limiter.
|
||||||
func (t Fixed) Release() {
|
func (t Fixed) Release() {
|
||||||
<-t
|
<-t
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,7 +49,7 @@ const (
|
||||||
DefaultMaxValuesPerTag = 100000
|
DefaultMaxValuesPerTag = 100000
|
||||||
|
|
||||||
// DefaultMaxConcurrentCompactions is the maximum number of concurrent full and level compactions
|
// DefaultMaxConcurrentCompactions is the maximum number of concurrent full and level compactions
|
||||||
// that can run at one time. A value of results in runtime.GOMAXPROCS(0) used at runtime.
|
// that can run at one time. A value of 0 results in 50% of runtime.GOMAXPROCS(0) used at runtime.
|
||||||
DefaultMaxConcurrentCompactions = 0
|
DefaultMaxConcurrentCompactions = 0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -144,11 +144,13 @@ func NewEngine(id uint64, i Index, database, path string, walPath string, option
|
||||||
|
|
||||||
// EngineOptions represents the options used to initialize the engine.
|
// EngineOptions represents the options used to initialize the engine.
|
||||||
type EngineOptions struct {
|
type EngineOptions struct {
|
||||||
EngineVersion string
|
EngineVersion string
|
||||||
IndexVersion string
|
IndexVersion string
|
||||||
ShardID uint64
|
ShardID uint64
|
||||||
InmemIndex interface{} // shared in-memory index
|
InmemIndex interface{} // shared in-memory index
|
||||||
CompactionLimiter limiter.Fixed
|
|
||||||
|
HiPriCompactionLimiter limiter.Fixed
|
||||||
|
LoPriCompactionLimiter limiter.Fixed
|
||||||
|
|
||||||
Config Config
|
Config Config
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,7 +118,7 @@ func (e *entry) deduplicate() {
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
defer e.mu.Unlock()
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
if len(e.values) == 0 {
|
if len(e.values) <= 1 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
e.values = e.values.Deduplicate()
|
e.values = e.values.Deduplicate()
|
||||||
|
@ -176,6 +176,8 @@ type storer interface {
|
||||||
apply(f func([]byte, *entry) error) error // Apply f to all entries in the store in parallel.
|
apply(f func([]byte, *entry) error) error // Apply f to all entries in the store in parallel.
|
||||||
applySerial(f func([]byte, *entry) error) error // Apply f to all entries in serial.
|
applySerial(f func([]byte, *entry) error) error // Apply f to all entries in serial.
|
||||||
reset() // Reset the store to an initial unused state.
|
reset() // Reset the store to an initial unused state.
|
||||||
|
split(n int) []storer // Split splits the store into n stores
|
||||||
|
count() int // Count returns the number of keys in the store
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cache maintains an in-memory store of Values for a set of keys.
|
// Cache maintains an in-memory store of Values for a set of keys.
|
||||||
|
@ -436,6 +438,15 @@ func (c *Cache) Deduplicate() {
|
||||||
func (c *Cache) ClearSnapshot(success bool) {
|
func (c *Cache) ClearSnapshot(success bool) {
|
||||||
c.init()
|
c.init()
|
||||||
|
|
||||||
|
c.mu.RLock()
|
||||||
|
snapStore := c.snapshot.store
|
||||||
|
c.mu.RUnlock()
|
||||||
|
|
||||||
|
// reset the snapshot store outside of the write lock
|
||||||
|
if success {
|
||||||
|
snapStore.reset()
|
||||||
|
}
|
||||||
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
@ -445,8 +456,7 @@ func (c *Cache) ClearSnapshot(success bool) {
|
||||||
c.snapshotAttempts = 0
|
c.snapshotAttempts = 0
|
||||||
c.updateMemSize(-int64(atomic.LoadUint64(&c.snapshotSize))) // decrement the number of bytes in cache
|
c.updateMemSize(-int64(atomic.LoadUint64(&c.snapshotSize))) // decrement the number of bytes in cache
|
||||||
|
|
||||||
// Reset the snapshot's store, and reset the snapshot to a fresh Cache.
|
// Reset the snapshot to a fresh Cache.
|
||||||
c.snapshot.store.reset()
|
|
||||||
c.snapshot = &Cache{
|
c.snapshot = &Cache{
|
||||||
store: c.snapshot.store,
|
store: c.snapshot.store,
|
||||||
}
|
}
|
||||||
|
@ -477,6 +487,13 @@ func (c *Cache) MaxSize() uint64 {
|
||||||
return c.maxSize
|
return c.maxSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cache) Count() int {
|
||||||
|
c.mu.RLock()
|
||||||
|
n := c.store.count()
|
||||||
|
c.mu.RUnlock()
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
// Keys returns a sorted slice of all keys under management by the cache.
|
// Keys returns a sorted slice of all keys under management by the cache.
|
||||||
func (c *Cache) Keys() [][]byte {
|
func (c *Cache) Keys() [][]byte {
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
|
@ -485,6 +502,21 @@ func (c *Cache) Keys() [][]byte {
|
||||||
return store.keys(true)
|
return store.keys(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cache) Split(n int) []*Cache {
|
||||||
|
if n == 1 {
|
||||||
|
return []*Cache{c}
|
||||||
|
}
|
||||||
|
|
||||||
|
caches := make([]*Cache, n)
|
||||||
|
storers := c.store.split(n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
caches[i] = &Cache{
|
||||||
|
store: storers[i],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return caches
|
||||||
|
}
|
||||||
|
|
||||||
// unsortedKeys returns a slice of all keys under management by the cache. The
|
// unsortedKeys returns a slice of all keys under management by the cache. The
|
||||||
// keys are not sorted.
|
// keys are not sorted.
|
||||||
func (c *Cache) unsortedKeys() [][]byte {
|
func (c *Cache) unsortedKeys() [][]byte {
|
||||||
|
@ -765,3 +797,5 @@ func (e emptyStore) keys(sorted bool) [][]byte { return nil
|
||||||
func (e emptyStore) apply(f func([]byte, *entry) error) error { return nil }
|
func (e emptyStore) apply(f func([]byte, *entry) error) error { return nil }
|
||||||
func (e emptyStore) applySerial(f func([]byte, *entry) error) error { return nil }
|
func (e emptyStore) applySerial(f func([]byte, *entry) error) error { return nil }
|
||||||
func (e emptyStore) reset() {}
|
func (e emptyStore) reset() {}
|
||||||
|
func (e emptyStore) split(n int) []storer { return nil }
|
||||||
|
func (e emptyStore) count() int { return 0 }
|
||||||
|
|
|
@ -759,6 +759,45 @@ func TestCacheLoader_LoadDeleted(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCache_Split(t *testing.T) {
|
||||||
|
v0 := NewValue(1, 1.0)
|
||||||
|
v1 := NewValue(2, 2.0)
|
||||||
|
v2 := NewValue(3, 3.0)
|
||||||
|
values := Values{v0, v1, v2}
|
||||||
|
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())
|
||||||
|
|
||||||
|
c := NewCache(0, "")
|
||||||
|
|
||||||
|
if err := c.Write([]byte("foo"), values); err != nil {
|
||||||
|
t.Fatalf("failed to write key foo to cache: %s", err.Error())
|
||||||
|
}
|
||||||
|
if err := c.Write([]byte("bar"), values); err != nil {
|
||||||
|
t.Fatalf("failed to write key foo to cache: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.Write([]byte("baz"), values); err != nil {
|
||||||
|
t.Fatalf("failed to write key foo to cache: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if n := c.Size(); n != 3*valuesSize+9 {
|
||||||
|
t.Fatalf("cache size incorrect after 3 writes, exp %d, got %d", 3*valuesSize*9, n)
|
||||||
|
}
|
||||||
|
|
||||||
|
splits := c.Split(3)
|
||||||
|
keys := make(map[string]int)
|
||||||
|
for _, s := range splits {
|
||||||
|
for _, k := range s.Keys() {
|
||||||
|
keys[string(k)] = s.Values(k).Size()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, key := range []string{"foo", "bar", "baz"} {
|
||||||
|
if _, ok := keys[key]; !ok {
|
||||||
|
t.Fatalf("missing key, exp %s, got %v", key, nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func mustTempDir() string {
|
func mustTempDir() string {
|
||||||
dir, err := ioutil.TempDir("", "tsm1-test")
|
dir, err := ioutil.TempDir("", "tsm1-test")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -797,6 +836,8 @@ type TestStore struct {
|
||||||
applyf func(f func([]byte, *entry) error) error
|
applyf func(f func([]byte, *entry) error) error
|
||||||
applySerialf func(f func([]byte, *entry) error) error
|
applySerialf func(f func([]byte, *entry) error) error
|
||||||
resetf func()
|
resetf func()
|
||||||
|
splitf func(n int) []storer
|
||||||
|
countf func() int
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTestStore() *TestStore { return &TestStore{} }
|
func NewTestStore() *TestStore { return &TestStore{} }
|
||||||
|
@ -808,6 +849,8 @@ func (s *TestStore) keys(sorted bool) [][]byte { return s.k
|
||||||
func (s *TestStore) apply(f func([]byte, *entry) error) error { return s.applyf(f) }
|
func (s *TestStore) apply(f func([]byte, *entry) error) error { return s.applyf(f) }
|
||||||
func (s *TestStore) applySerial(f func([]byte, *entry) error) error { return s.applySerialf(f) }
|
func (s *TestStore) applySerial(f func([]byte, *entry) error) error { return s.applySerialf(f) }
|
||||||
func (s *TestStore) reset() { s.resetf() }
|
func (s *TestStore) reset() { s.resetf() }
|
||||||
|
func (s *TestStore) split(n int) []storer { return s.splitf(n) }
|
||||||
|
func (s *TestStore) count() int { return s.countf() }
|
||||||
|
|
||||||
var fvSize = uint64(NewValue(1, float64(1)).Size())
|
var fvSize = uint64(NewValue(1, float64(1)).Size())
|
||||||
|
|
||||||
|
|
|
@ -6,10 +6,6 @@
|
||||||
|
|
||||||
package tsm1
|
package tsm1
|
||||||
|
|
||||||
import (
|
|
||||||
"runtime"
|
|
||||||
)
|
|
||||||
|
|
||||||
// merge combines the next set of blocks into merged blocks.
|
// merge combines the next set of blocks into merged blocks.
|
||||||
func (k *tsmKeyIterator) mergeFloat() {
|
func (k *tsmKeyIterator) mergeFloat() {
|
||||||
// No blocks left, or pending merged values, we're done
|
// No blocks left, or pending merged values, we're done
|
||||||
|
@ -92,10 +88,6 @@ func (k *tsmKeyIterator) combineFloat(dedup bool) blocks {
|
||||||
}
|
}
|
||||||
|
|
||||||
k.mergedFloatValues = k.mergedFloatValues.Merge(v)
|
k.mergedFloatValues = k.mergedFloatValues.Merge(v)
|
||||||
|
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,8 +112,6 @@ func (k *tsmKeyIterator) combineFloat(dedup bool) blocks {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
i++
|
i++
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if k.fast {
|
if k.fast {
|
||||||
|
@ -134,8 +124,6 @@ func (k *tsmKeyIterator) combineFloat(dedup bool) blocks {
|
||||||
|
|
||||||
chunked = append(chunked, k.blocks[i])
|
chunked = append(chunked, k.blocks[i])
|
||||||
i++
|
i++
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -170,8 +158,6 @@ func (k *tsmKeyIterator) combineFloat(dedup bool) blocks {
|
||||||
|
|
||||||
k.mergedFloatValues = k.mergedFloatValues.Merge(v)
|
k.mergedFloatValues = k.mergedFloatValues.Merge(v)
|
||||||
i++
|
i++
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
k.blocks = k.blocks[i:]
|
k.blocks = k.blocks[i:]
|
||||||
|
@ -300,10 +286,6 @@ func (k *tsmKeyIterator) combineInteger(dedup bool) blocks {
|
||||||
}
|
}
|
||||||
|
|
||||||
k.mergedIntegerValues = k.mergedIntegerValues.Merge(v)
|
k.mergedIntegerValues = k.mergedIntegerValues.Merge(v)
|
||||||
|
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -328,8 +310,6 @@ func (k *tsmKeyIterator) combineInteger(dedup bool) blocks {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
i++
|
i++
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if k.fast {
|
if k.fast {
|
||||||
|
@ -342,8 +322,6 @@ func (k *tsmKeyIterator) combineInteger(dedup bool) blocks {
|
||||||
|
|
||||||
chunked = append(chunked, k.blocks[i])
|
chunked = append(chunked, k.blocks[i])
|
||||||
i++
|
i++
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -378,8 +356,6 @@ func (k *tsmKeyIterator) combineInteger(dedup bool) blocks {
|
||||||
|
|
||||||
k.mergedIntegerValues = k.mergedIntegerValues.Merge(v)
|
k.mergedIntegerValues = k.mergedIntegerValues.Merge(v)
|
||||||
i++
|
i++
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
k.blocks = k.blocks[i:]
|
k.blocks = k.blocks[i:]
|
||||||
|
@ -508,10 +484,6 @@ func (k *tsmKeyIterator) combineUnsigned(dedup bool) blocks {
|
||||||
}
|
}
|
||||||
|
|
||||||
k.mergedUnsignedValues = k.mergedUnsignedValues.Merge(v)
|
k.mergedUnsignedValues = k.mergedUnsignedValues.Merge(v)
|
||||||
|
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -536,8 +508,6 @@ func (k *tsmKeyIterator) combineUnsigned(dedup bool) blocks {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
i++
|
i++
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if k.fast {
|
if k.fast {
|
||||||
|
@ -550,8 +520,6 @@ func (k *tsmKeyIterator) combineUnsigned(dedup bool) blocks {
|
||||||
|
|
||||||
chunked = append(chunked, k.blocks[i])
|
chunked = append(chunked, k.blocks[i])
|
||||||
i++
|
i++
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -586,8 +554,6 @@ func (k *tsmKeyIterator) combineUnsigned(dedup bool) blocks {
|
||||||
|
|
||||||
k.mergedUnsignedValues = k.mergedUnsignedValues.Merge(v)
|
k.mergedUnsignedValues = k.mergedUnsignedValues.Merge(v)
|
||||||
i++
|
i++
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
k.blocks = k.blocks[i:]
|
k.blocks = k.blocks[i:]
|
||||||
|
@ -716,10 +682,6 @@ func (k *tsmKeyIterator) combineString(dedup bool) blocks {
|
||||||
}
|
}
|
||||||
|
|
||||||
k.mergedStringValues = k.mergedStringValues.Merge(v)
|
k.mergedStringValues = k.mergedStringValues.Merge(v)
|
||||||
|
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -744,8 +706,6 @@ func (k *tsmKeyIterator) combineString(dedup bool) blocks {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
i++
|
i++
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if k.fast {
|
if k.fast {
|
||||||
|
@ -758,8 +718,6 @@ func (k *tsmKeyIterator) combineString(dedup bool) blocks {
|
||||||
|
|
||||||
chunked = append(chunked, k.blocks[i])
|
chunked = append(chunked, k.blocks[i])
|
||||||
i++
|
i++
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -794,8 +752,6 @@ func (k *tsmKeyIterator) combineString(dedup bool) blocks {
|
||||||
|
|
||||||
k.mergedStringValues = k.mergedStringValues.Merge(v)
|
k.mergedStringValues = k.mergedStringValues.Merge(v)
|
||||||
i++
|
i++
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
k.blocks = k.blocks[i:]
|
k.blocks = k.blocks[i:]
|
||||||
|
@ -924,10 +880,6 @@ func (k *tsmKeyIterator) combineBoolean(dedup bool) blocks {
|
||||||
}
|
}
|
||||||
|
|
||||||
k.mergedBooleanValues = k.mergedBooleanValues.Merge(v)
|
k.mergedBooleanValues = k.mergedBooleanValues.Merge(v)
|
||||||
|
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -952,8 +904,6 @@ func (k *tsmKeyIterator) combineBoolean(dedup bool) blocks {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
i++
|
i++
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if k.fast {
|
if k.fast {
|
||||||
|
@ -966,8 +916,6 @@ func (k *tsmKeyIterator) combineBoolean(dedup bool) blocks {
|
||||||
|
|
||||||
chunked = append(chunked, k.blocks[i])
|
chunked = append(chunked, k.blocks[i])
|
||||||
i++
|
i++
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1002,8 +950,6 @@ func (k *tsmKeyIterator) combineBoolean(dedup bool) blocks {
|
||||||
|
|
||||||
k.mergedBooleanValues = k.mergedBooleanValues.Merge(v)
|
k.mergedBooleanValues = k.mergedBooleanValues.Merge(v)
|
||||||
i++
|
i++
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
k.blocks = k.blocks[i:]
|
k.blocks = k.blocks[i:]
|
||||||
|
|
|
@ -1,9 +1,5 @@
|
||||||
package tsm1
|
package tsm1
|
||||||
|
|
||||||
import (
|
|
||||||
"runtime"
|
|
||||||
)
|
|
||||||
|
|
||||||
{{range .}}
|
{{range .}}
|
||||||
|
|
||||||
// merge combines the next set of blocks into merged blocks.
|
// merge combines the next set of blocks into merged blocks.
|
||||||
|
@ -88,10 +84,6 @@ func (k *tsmKeyIterator) combine{{.Name}}(dedup bool) blocks {
|
||||||
}
|
}
|
||||||
|
|
||||||
k.merged{{.Name}}Values = k.merged{{.Name}}Values.Merge(v)
|
k.merged{{.Name}}Values = k.merged{{.Name}}Values.Merge(v)
|
||||||
|
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,8 +108,6 @@ func (k *tsmKeyIterator) combine{{.Name}}(dedup bool) blocks {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
i++
|
i++
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if k.fast {
|
if k.fast {
|
||||||
|
@ -130,8 +120,6 @@ func (k *tsmKeyIterator) combine{{.Name}}(dedup bool) blocks {
|
||||||
|
|
||||||
chunked = append(chunked, k.blocks[i])
|
chunked = append(chunked, k.blocks[i])
|
||||||
i++
|
i++
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,8 +154,6 @@ func (k *tsmKeyIterator) combine{{.Name}}(dedup bool) blocks {
|
||||||
|
|
||||||
k.merged{{.Name}}Values = k.merged{{.Name}}Values.Merge(v)
|
k.merged{{.Name}}Values = k.merged{{.Name}}Values.Merge(v)
|
||||||
i++
|
i++
|
||||||
// Allow other goroutines to run
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
k.blocks = k.blocks[i:]
|
k.blocks = k.blocks[i:]
|
||||||
|
|
|
@ -227,9 +227,16 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
|
||||||
minGenerations = level + 1
|
minGenerations = level + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Each compaction group should run against 4 generations. For level 1, since these
|
||||||
|
// can get created much more quickly, bump the grouping to 8 to keep file counts lower.
|
||||||
|
groupSize := 4
|
||||||
|
if level == 1 || level == 3 {
|
||||||
|
groupSize = 8
|
||||||
|
}
|
||||||
|
|
||||||
var cGroups []CompactionGroup
|
var cGroups []CompactionGroup
|
||||||
for _, group := range levelGroups {
|
for _, group := range levelGroups {
|
||||||
for _, chunk := range group.chunk(4) {
|
for _, chunk := range group.chunk(groupSize) {
|
||||||
var cGroup CompactionGroup
|
var cGroup CompactionGroup
|
||||||
var hasTombstones bool
|
var hasTombstones bool
|
||||||
for _, gen := range chunk {
|
for _, gen := range chunk {
|
||||||
|
@ -697,8 +704,43 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
|
||||||
return nil, errSnapshotsDisabled
|
return nil, errSnapshotsDisabled
|
||||||
}
|
}
|
||||||
|
|
||||||
iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock, intC)
|
concurrency := 1
|
||||||
files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter)
|
card := cache.Count()
|
||||||
|
if card >= 1024*1024 {
|
||||||
|
concurrency = card / 1024 * 1024
|
||||||
|
if concurrency < 1 {
|
||||||
|
concurrency = 1
|
||||||
|
}
|
||||||
|
if concurrency > 4 {
|
||||||
|
concurrency = 4
|
||||||
|
}
|
||||||
|
}
|
||||||
|
splits := cache.Split(concurrency)
|
||||||
|
|
||||||
|
type res struct {
|
||||||
|
files []string
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
resC := make(chan res, concurrency)
|
||||||
|
for i := 0; i < concurrency; i++ {
|
||||||
|
go func(sp *Cache) {
|
||||||
|
iter := NewCacheKeyIterator(sp, tsdb.DefaultMaxPointsPerBlock, intC)
|
||||||
|
files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter)
|
||||||
|
resC <- res{files: files, err: err}
|
||||||
|
|
||||||
|
}(splits[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
files := make([]string, 0, concurrency)
|
||||||
|
for i := 0; i < concurrency; i++ {
|
||||||
|
result := <-resC
|
||||||
|
if result.err != nil {
|
||||||
|
err = result.err
|
||||||
|
}
|
||||||
|
files = append(files, result.files...)
|
||||||
|
}
|
||||||
|
|
||||||
// See if we were disabled while writing a snapshot
|
// See if we were disabled while writing a snapshot
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
|
@ -1337,56 +1379,87 @@ func (c *cacheKeyIterator) encode() {
|
||||||
n := len(c.ready)
|
n := len(c.ready)
|
||||||
|
|
||||||
// Divide the keyset across each CPU
|
// Divide the keyset across each CPU
|
||||||
chunkSize := 128
|
chunkSize := 1
|
||||||
idx := uint64(0)
|
idx := uint64(0)
|
||||||
|
|
||||||
for i := 0; i < concurrency; i++ {
|
for i := 0; i < concurrency; i++ {
|
||||||
// Run one goroutine per CPU and encode a section of the key space concurrently
|
// Run one goroutine per CPU and encode a section of the key space concurrently
|
||||||
go func() {
|
go func() {
|
||||||
|
tenc := getTimeEncoder(tsdb.DefaultMaxPointsPerBlock)
|
||||||
|
fenc := getFloatEncoder(tsdb.DefaultMaxPointsPerBlock)
|
||||||
|
benc := getBooleanEncoder(tsdb.DefaultMaxPointsPerBlock)
|
||||||
|
uenc := getUnsignedEncoder(tsdb.DefaultMaxPointsPerBlock)
|
||||||
|
senc := getStringEncoder(tsdb.DefaultMaxPointsPerBlock)
|
||||||
|
ienc := getIntegerEncoder(tsdb.DefaultMaxPointsPerBlock)
|
||||||
|
|
||||||
|
defer putTimeEncoder(tenc)
|
||||||
|
defer putFloatEncoder(fenc)
|
||||||
|
defer putBooleanEncoder(benc)
|
||||||
|
defer putUnsignedEncoder(uenc)
|
||||||
|
defer putStringEncoder(senc)
|
||||||
|
defer putIntegerEncoder(ienc)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
start := int(atomic.AddUint64(&idx, uint64(chunkSize))) - chunkSize
|
i := int(atomic.AddUint64(&idx, uint64(chunkSize))) - chunkSize
|
||||||
if start >= n {
|
|
||||||
|
if i >= n {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
end := start + chunkSize
|
|
||||||
if end > n {
|
key := c.order[i]
|
||||||
end = n
|
values := c.cache.values(key)
|
||||||
|
|
||||||
|
for len(values) > 0 {
|
||||||
|
|
||||||
|
end := len(values)
|
||||||
|
if end > c.size {
|
||||||
|
end = c.size
|
||||||
|
}
|
||||||
|
|
||||||
|
minTime, maxTime := values[0].UnixNano(), values[end-1].UnixNano()
|
||||||
|
var b []byte
|
||||||
|
var err error
|
||||||
|
tenc.Reset()
|
||||||
|
|
||||||
|
maxTime = values[end-1].UnixNano()
|
||||||
|
|
||||||
|
switch values[0].(type) {
|
||||||
|
case FloatValue:
|
||||||
|
fenc.Reset()
|
||||||
|
b, err = encodeFloatBlockUsing(nil, values[:end], tenc, fenc)
|
||||||
|
case IntegerValue:
|
||||||
|
ienc.Reset()
|
||||||
|
b, err = encodeIntegerBlockUsing(nil, values[:end], tenc, ienc)
|
||||||
|
case UnsignedValue:
|
||||||
|
uenc.Reset()
|
||||||
|
b, err = encodeUnsignedBlockUsing(nil, values[:end], tenc, uenc)
|
||||||
|
case BooleanValue:
|
||||||
|
benc.Reset()
|
||||||
|
b, err = encodeBooleanBlockUsing(nil, values[:end], tenc, benc)
|
||||||
|
case StringValue:
|
||||||
|
senc.Reset()
|
||||||
|
b, err = encodeStringBlockUsing(nil, values[:end], tenc, senc)
|
||||||
|
default:
|
||||||
|
b, err = Values(values[:end]).Encode(nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
values = values[end:]
|
||||||
|
|
||||||
|
c.blocks[i] = append(c.blocks[i], cacheBlock{
|
||||||
|
k: key,
|
||||||
|
minTime: minTime,
|
||||||
|
maxTime: maxTime,
|
||||||
|
b: b,
|
||||||
|
err: err,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
c.encodeRange(start, end)
|
// Notify this key is fully encoded
|
||||||
|
c.ready[i] <- struct{}{}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cacheKeyIterator) encodeRange(start, stop int) {
|
|
||||||
for i := start; i < stop; i++ {
|
|
||||||
key := c.order[i]
|
|
||||||
values := c.cache.values(key)
|
|
||||||
|
|
||||||
for len(values) > 0 {
|
|
||||||
minTime, maxTime := values[0].UnixNano(), values[len(values)-1].UnixNano()
|
|
||||||
var b []byte
|
|
||||||
var err error
|
|
||||||
if len(values) > c.size {
|
|
||||||
maxTime = values[c.size-1].UnixNano()
|
|
||||||
b, err = Values(values[:c.size]).Encode(nil)
|
|
||||||
values = values[c.size:]
|
|
||||||
} else {
|
|
||||||
b, err = Values(values).Encode(nil)
|
|
||||||
values = values[:0]
|
|
||||||
}
|
|
||||||
c.blocks[i] = append(c.blocks[i], cacheBlock{
|
|
||||||
k: key,
|
|
||||||
minTime: minTime,
|
|
||||||
maxTime: maxTime,
|
|
||||||
b: b,
|
|
||||||
err: err,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
// Notify this key is fully encoded
|
|
||||||
c.ready[i] <- struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cacheKeyIterator) Next() bool {
|
func (c *cacheKeyIterator) Next() bool {
|
||||||
if c.i >= 0 && c.i < len(c.ready) && len(c.blocks[c.i]) > 0 {
|
if c.i >= 0 && c.i < len(c.ready) && len(c.blocks[c.i]) > 0 {
|
||||||
c.blocks[c.i] = c.blocks[c.i][1:]
|
c.blocks[c.i] = c.blocks[c.i][1:]
|
||||||
|
|
|
@ -1689,6 +1689,22 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) {
|
||||||
Path: "06-01.tsm1",
|
Path: "06-01.tsm1",
|
||||||
Size: 1 * 1024 * 1024,
|
Size: 1 * 1024 * 1024,
|
||||||
},
|
},
|
||||||
|
tsm1.FileStat{
|
||||||
|
Path: "07-01.tsm1",
|
||||||
|
Size: 1 * 1024 * 1024,
|
||||||
|
},
|
||||||
|
tsm1.FileStat{
|
||||||
|
Path: "08-01.tsm1",
|
||||||
|
Size: 1 * 1024 * 1024,
|
||||||
|
},
|
||||||
|
tsm1.FileStat{
|
||||||
|
Path: "09-01.tsm1",
|
||||||
|
Size: 1 * 1024 * 1024,
|
||||||
|
},
|
||||||
|
tsm1.FileStat{
|
||||||
|
Path: "10-01.tsm1",
|
||||||
|
Size: 1 * 1024 * 1024,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
cp := tsm1.NewDefaultPlanner(
|
cp := tsm1.NewDefaultPlanner(
|
||||||
|
@ -1699,8 +1715,8 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) {
|
||||||
}, tsdb.DefaultCompactFullWriteColdDuration,
|
}, tsdb.DefaultCompactFullWriteColdDuration,
|
||||||
)
|
)
|
||||||
|
|
||||||
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
|
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]}
|
||||||
expFiles2 := []tsm1.FileStat{data[4], data[5]}
|
expFiles2 := []tsm1.FileStat{data[8], data[9]}
|
||||||
|
|
||||||
tsm := cp.PlanLevel(1)
|
tsm := cp.PlanLevel(1)
|
||||||
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
|
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
|
||||||
|
|
|
@ -56,7 +56,7 @@ func (a Values) assertOrdered() {
|
||||||
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
||||||
// The Value that appears last in the slice is the one that is kept.
|
// The Value that appears last in the slice is the one that is kept.
|
||||||
func (a Values) Deduplicate() Values {
|
func (a Values) Deduplicate() Values {
|
||||||
if len(a) == 0 {
|
if len(a) <= 1 {
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -268,7 +268,7 @@ func (a FloatValues) assertOrdered() {
|
||||||
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
||||||
// The Value that appears last in the slice is the one that is kept.
|
// The Value that appears last in the slice is the one that is kept.
|
||||||
func (a FloatValues) Deduplicate() FloatValues {
|
func (a FloatValues) Deduplicate() FloatValues {
|
||||||
if len(a) == 0 {
|
if len(a) <= 1 {
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -524,7 +524,7 @@ func (a IntegerValues) assertOrdered() {
|
||||||
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
||||||
// The Value that appears last in the slice is the one that is kept.
|
// The Value that appears last in the slice is the one that is kept.
|
||||||
func (a IntegerValues) Deduplicate() IntegerValues {
|
func (a IntegerValues) Deduplicate() IntegerValues {
|
||||||
if len(a) == 0 {
|
if len(a) <= 1 {
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -780,7 +780,7 @@ func (a UnsignedValues) assertOrdered() {
|
||||||
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
||||||
// The Value that appears last in the slice is the one that is kept.
|
// The Value that appears last in the slice is the one that is kept.
|
||||||
func (a UnsignedValues) Deduplicate() UnsignedValues {
|
func (a UnsignedValues) Deduplicate() UnsignedValues {
|
||||||
if len(a) == 0 {
|
if len(a) <= 1 {
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1036,7 +1036,7 @@ func (a StringValues) assertOrdered() {
|
||||||
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
||||||
// The Value that appears last in the slice is the one that is kept.
|
// The Value that appears last in the slice is the one that is kept.
|
||||||
func (a StringValues) Deduplicate() StringValues {
|
func (a StringValues) Deduplicate() StringValues {
|
||||||
if len(a) == 0 {
|
if len(a) <= 1 {
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1292,7 +1292,7 @@ func (a BooleanValues) assertOrdered() {
|
||||||
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
||||||
// The Value that appears last in the slice is the one that is kept.
|
// The Value that appears last in the slice is the one that is kept.
|
||||||
func (a BooleanValues) Deduplicate() BooleanValues {
|
func (a BooleanValues) Deduplicate() BooleanValues {
|
||||||
if len(a) == 0 {
|
if len(a) <= 1 {
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,7 +53,7 @@ func (a {{.Name}}Values) assertOrdered() {
|
||||||
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
// Deduplicate returns a new slice with any values that have the same timestamp removed.
|
||||||
// The Value that appears last in the slice is the one that is kept.
|
// The Value that appears last in the slice is the one that is kept.
|
||||||
func (a {{.Name}}Values) Deduplicate() {{.Name}}Values {
|
func (a {{.Name}}Values) Deduplicate() {{.Name}}Values {
|
||||||
if len(a) == 0 {
|
if len(a) <= 1 {
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -359,32 +359,7 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) {
|
||||||
// frame-or-reference and run length encoding.
|
// frame-or-reference and run length encoding.
|
||||||
tsenc := getTimeEncoder(len(values))
|
tsenc := getTimeEncoder(len(values))
|
||||||
|
|
||||||
var b []byte
|
b, err := encodeFloatBlockUsing(buf, values, tsenc, venc)
|
||||||
err := func() error {
|
|
||||||
for _, v := range values {
|
|
||||||
vv := v.(FloatValue)
|
|
||||||
tsenc.Write(vv.unixnano)
|
|
||||||
venc.Write(vv.value)
|
|
||||||
}
|
|
||||||
venc.Flush()
|
|
||||||
|
|
||||||
// Encoded timestamp values
|
|
||||||
tb, err := tsenc.Bytes()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Encoded float values
|
|
||||||
vb, err := venc.Bytes()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Prepend the first timestamp of the block in the first 8 bytes and the block
|
|
||||||
// in the next byte, followed by the block
|
|
||||||
b = packBlock(buf, BlockFloat64, tb, vb)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}()
|
|
||||||
|
|
||||||
putTimeEncoder(tsenc)
|
putTimeEncoder(tsenc)
|
||||||
putFloatEncoder(venc)
|
putFloatEncoder(venc)
|
||||||
|
@ -392,6 +367,33 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) {
|
||||||
return b, err
|
return b, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func encodeFloatBlockUsing(buf []byte, values []Value, tsenc TimeEncoder, venc *FloatEncoder) ([]byte, error) {
|
||||||
|
tsenc.Reset()
|
||||||
|
venc.Reset()
|
||||||
|
|
||||||
|
for _, v := range values {
|
||||||
|
vv := v.(FloatValue)
|
||||||
|
tsenc.Write(vv.unixnano)
|
||||||
|
venc.Write(vv.value)
|
||||||
|
}
|
||||||
|
venc.Flush()
|
||||||
|
|
||||||
|
// Encoded timestamp values
|
||||||
|
tb, err := tsenc.Bytes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Encoded float values
|
||||||
|
vb, err := venc.Bytes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepend the first timestamp of the block in the first 8 bytes and the block
|
||||||
|
// in the next byte, followed by the block
|
||||||
|
return packBlock(buf, BlockFloat64, tb, vb), nil
|
||||||
|
}
|
||||||
|
|
||||||
// DecodeFloatBlock decodes the float block from the byte slice
|
// DecodeFloatBlock decodes the float block from the byte slice
|
||||||
// and appends the float values to a.
|
// and appends the float values to a.
|
||||||
func DecodeFloatBlock(block []byte, a *[]FloatValue) ([]FloatValue, error) {
|
func DecodeFloatBlock(block []byte, a *[]FloatValue) ([]FloatValue, error) {
|
||||||
|
@ -499,30 +501,7 @@ func encodeBooleanBlock(buf []byte, values []Value) ([]byte, error) {
|
||||||
// Encode timestamps using an adaptive encoder
|
// Encode timestamps using an adaptive encoder
|
||||||
tsenc := getTimeEncoder(len(values))
|
tsenc := getTimeEncoder(len(values))
|
||||||
|
|
||||||
var b []byte
|
b, err := encodeBooleanBlockUsing(buf, values, tsenc, venc)
|
||||||
err := func() error {
|
|
||||||
for _, v := range values {
|
|
||||||
vv := v.(BooleanValue)
|
|
||||||
tsenc.Write(vv.unixnano)
|
|
||||||
venc.Write(vv.value)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encoded timestamp values
|
|
||||||
tb, err := tsenc.Bytes()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Encoded float values
|
|
||||||
vb, err := venc.Bytes()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Prepend the first timestamp of the block in the first 8 bytes and the block
|
|
||||||
// in the next byte, followed by the block
|
|
||||||
b = packBlock(buf, BlockBoolean, tb, vb)
|
|
||||||
return nil
|
|
||||||
}()
|
|
||||||
|
|
||||||
putTimeEncoder(tsenc)
|
putTimeEncoder(tsenc)
|
||||||
putBooleanEncoder(venc)
|
putBooleanEncoder(venc)
|
||||||
|
@ -530,6 +509,32 @@ func encodeBooleanBlock(buf []byte, values []Value) ([]byte, error) {
|
||||||
return b, err
|
return b, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func encodeBooleanBlockUsing(buf []byte, values []Value, tenc TimeEncoder, venc BooleanEncoder) ([]byte, error) {
|
||||||
|
tenc.Reset()
|
||||||
|
venc.Reset()
|
||||||
|
|
||||||
|
for _, v := range values {
|
||||||
|
vv := v.(BooleanValue)
|
||||||
|
tenc.Write(vv.unixnano)
|
||||||
|
venc.Write(vv.value)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encoded timestamp values
|
||||||
|
tb, err := tenc.Bytes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Encoded float values
|
||||||
|
vb, err := venc.Bytes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepend the first timestamp of the block in the first 8 bytes and the block
|
||||||
|
// in the next byte, followed by the block
|
||||||
|
return packBlock(buf, BlockBoolean, tb, vb), nil
|
||||||
|
}
|
||||||
|
|
||||||
// DecodeBooleanBlock decodes the boolean block from the byte slice
|
// DecodeBooleanBlock decodes the boolean block from the byte slice
|
||||||
// and appends the boolean values to a.
|
// and appends the boolean values to a.
|
||||||
func DecodeBooleanBlock(block []byte, a *[]BooleanValue) ([]BooleanValue, error) {
|
func DecodeBooleanBlock(block []byte, a *[]BooleanValue) ([]BooleanValue, error) {
|
||||||
|
@ -622,39 +627,42 @@ func (v IntegerValue) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func encodeIntegerBlock(buf []byte, values []Value) ([]byte, error) {
|
func encodeIntegerBlock(buf []byte, values []Value) ([]byte, error) {
|
||||||
tsEnc := getTimeEncoder(len(values))
|
tenc := getTimeEncoder(len(values))
|
||||||
vEnc := getIntegerEncoder(len(values))
|
venc := getIntegerEncoder(len(values))
|
||||||
|
|
||||||
var b []byte
|
b, err := encodeIntegerBlockUsing(buf, values, tenc, venc)
|
||||||
err := func() error {
|
|
||||||
for _, v := range values {
|
|
||||||
vv := v.(IntegerValue)
|
|
||||||
tsEnc.Write(vv.unixnano)
|
|
||||||
vEnc.Write(vv.value)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encoded timestamp values
|
putTimeEncoder(tenc)
|
||||||
tb, err := tsEnc.Bytes()
|
putIntegerEncoder(venc)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Encoded int64 values
|
|
||||||
vb, err := vEnc.Bytes()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Prepend the first timestamp of the block in the first 8 bytes
|
|
||||||
b = packBlock(buf, BlockInteger, tb, vb)
|
|
||||||
return nil
|
|
||||||
}()
|
|
||||||
|
|
||||||
putTimeEncoder(tsEnc)
|
|
||||||
putIntegerEncoder(vEnc)
|
|
||||||
|
|
||||||
return b, err
|
return b, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func encodeIntegerBlockUsing(buf []byte, values []Value, tenc TimeEncoder, venc IntegerEncoder) ([]byte, error) {
|
||||||
|
tenc.Reset()
|
||||||
|
venc.Reset()
|
||||||
|
|
||||||
|
for _, v := range values {
|
||||||
|
vv := v.(IntegerValue)
|
||||||
|
tenc.Write(vv.unixnano)
|
||||||
|
venc.Write(vv.value)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encoded timestamp values
|
||||||
|
tb, err := tenc.Bytes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Encoded int64 values
|
||||||
|
vb, err := venc.Bytes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepend the first timestamp of the block in the first 8 bytes
|
||||||
|
return packBlock(buf, BlockInteger, tb, vb), nil
|
||||||
|
}
|
||||||
|
|
||||||
// DecodeIntegerBlock decodes the integer block from the byte slice
|
// DecodeIntegerBlock decodes the integer block from the byte slice
|
||||||
// and appends the integer values to a.
|
// and appends the integer values to a.
|
||||||
func DecodeIntegerBlock(block []byte, a *[]IntegerValue) ([]IntegerValue, error) {
|
func DecodeIntegerBlock(block []byte, a *[]IntegerValue) ([]IntegerValue, error) {
|
||||||
|
@ -748,39 +756,42 @@ func (v UnsignedValue) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func encodeUnsignedBlock(buf []byte, values []Value) ([]byte, error) {
|
func encodeUnsignedBlock(buf []byte, values []Value) ([]byte, error) {
|
||||||
tsEnc := getTimeEncoder(len(values))
|
tenc := getTimeEncoder(len(values))
|
||||||
vEnc := getUnsignedEncoder(len(values))
|
venc := getUnsignedEncoder(len(values))
|
||||||
|
|
||||||
var b []byte
|
b, err := encodeUnsignedBlockUsing(buf, values, tenc, venc)
|
||||||
err := func() error {
|
|
||||||
for _, v := range values {
|
|
||||||
vv := v.(UnsignedValue)
|
|
||||||
tsEnc.Write(vv.unixnano)
|
|
||||||
vEnc.Write(int64(vv.value))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encoded timestamp values
|
putTimeEncoder(tenc)
|
||||||
tb, err := tsEnc.Bytes()
|
putUnsignedEncoder(venc)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Encoded int64 values
|
|
||||||
vb, err := vEnc.Bytes()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Prepend the first timestamp of the block in the first 8 bytes
|
|
||||||
b = packBlock(buf, BlockUnsigned, tb, vb)
|
|
||||||
return nil
|
|
||||||
}()
|
|
||||||
|
|
||||||
putTimeEncoder(tsEnc)
|
|
||||||
putUnsignedEncoder(vEnc)
|
|
||||||
|
|
||||||
return b, err
|
return b, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func encodeUnsignedBlockUsing(buf []byte, values []Value, tenc TimeEncoder, venc IntegerEncoder) ([]byte, error) {
|
||||||
|
tenc.Reset()
|
||||||
|
venc.Reset()
|
||||||
|
|
||||||
|
for _, v := range values {
|
||||||
|
vv := v.(UnsignedValue)
|
||||||
|
tenc.Write(vv.unixnano)
|
||||||
|
venc.Write(int64(vv.value))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encoded timestamp values
|
||||||
|
tb, err := tenc.Bytes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Encoded int64 values
|
||||||
|
vb, err := venc.Bytes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepend the first timestamp of the block in the first 8 bytes
|
||||||
|
return packBlock(buf, BlockUnsigned, tb, vb), nil
|
||||||
|
}
|
||||||
|
|
||||||
// DecodeUnsignedBlock decodes the unsigned integer block from the byte slice
|
// DecodeUnsignedBlock decodes the unsigned integer block from the byte slice
|
||||||
// and appends the unsigned integer values to a.
|
// and appends the unsigned integer values to a.
|
||||||
func DecodeUnsignedBlock(block []byte, a *[]UnsignedValue) ([]UnsignedValue, error) {
|
func DecodeUnsignedBlock(block []byte, a *[]UnsignedValue) ([]UnsignedValue, error) {
|
||||||
|
@ -874,40 +885,42 @@ func (v StringValue) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func encodeStringBlock(buf []byte, values []Value) ([]byte, error) {
|
func encodeStringBlock(buf []byte, values []Value) ([]byte, error) {
|
||||||
tsEnc := getTimeEncoder(len(values))
|
tenc := getTimeEncoder(len(values))
|
||||||
vEnc := getStringEncoder(len(values) * len(values[0].(StringValue).value))
|
venc := getStringEncoder(len(values) * len(values[0].(StringValue).value))
|
||||||
|
|
||||||
var b []byte
|
b, err := encodeStringBlockUsing(buf, values, tenc, venc)
|
||||||
err := func() error {
|
|
||||||
for _, v := range values {
|
|
||||||
vv := v.(StringValue)
|
|
||||||
tsEnc.Write(vv.unixnano)
|
|
||||||
vEnc.Write(vv.value)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encoded timestamp values
|
putTimeEncoder(tenc)
|
||||||
tb, err := tsEnc.Bytes()
|
putStringEncoder(venc)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Encoded string values
|
|
||||||
vb, err := vEnc.Bytes()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Prepend the first timestamp of the block in the first 8 bytes
|
|
||||||
b = packBlock(buf, BlockString, tb, vb)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}()
|
|
||||||
|
|
||||||
putTimeEncoder(tsEnc)
|
|
||||||
putStringEncoder(vEnc)
|
|
||||||
|
|
||||||
return b, err
|
return b, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func encodeStringBlockUsing(buf []byte, values []Value, tenc TimeEncoder, venc StringEncoder) ([]byte, error) {
|
||||||
|
tenc.Reset()
|
||||||
|
venc.Reset()
|
||||||
|
|
||||||
|
for _, v := range values {
|
||||||
|
vv := v.(StringValue)
|
||||||
|
tenc.Write(vv.unixnano)
|
||||||
|
venc.Write(vv.value)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encoded timestamp values
|
||||||
|
tb, err := tenc.Bytes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Encoded string values
|
||||||
|
vb, err := venc.Bytes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepend the first timestamp of the block in the first 8 bytes
|
||||||
|
return packBlock(buf, BlockString, tb, vb), nil
|
||||||
|
}
|
||||||
|
|
||||||
// DecodeStringBlock decodes the string block from the byte slice
|
// DecodeStringBlock decodes the string block from the byte slice
|
||||||
// and appends the string values to a.
|
// and appends the string values to a.
|
||||||
func DecodeStringBlock(block []byte, a *[]StringValue) ([]StringValue, error) {
|
func DecodeStringBlock(block []byte, a *[]StringValue) ([]StringValue, error) {
|
||||||
|
|
|
@ -26,6 +26,7 @@ import (
|
||||||
"github.com/influxdata/influxdb/tsdb"
|
"github.com/influxdata/influxdb/tsdb"
|
||||||
_ "github.com/influxdata/influxdb/tsdb/index"
|
_ "github.com/influxdata/influxdb/tsdb/index"
|
||||||
"github.com/influxdata/influxdb/tsdb/index/inmem"
|
"github.com/influxdata/influxdb/tsdb/index/inmem"
|
||||||
|
"github.com/influxdata/influxdb/tsdb/index/tsi1"
|
||||||
"github.com/uber-go/zap"
|
"github.com/uber-go/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -136,8 +137,10 @@ type Engine struct {
|
||||||
|
|
||||||
stats *EngineStatistics
|
stats *EngineStatistics
|
||||||
|
|
||||||
// The limiter for concurrent compactions
|
// Limiters for concurrent compactions. The low priority limiter is for level 3 and 4
|
||||||
compactionLimiter limiter.Fixed
|
// compactions. The high priority is for level 1 and 2 compactions.
|
||||||
|
loPriCompactionLimiter limiter.Fixed
|
||||||
|
hiPriCompactionLimiter limiter.Fixed
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEngine returns a new instance of Engine.
|
// NewEngine returns a new instance of Engine.
|
||||||
|
@ -175,8 +178,9 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
|
||||||
CacheFlushMemorySizeThreshold: opt.Config.CacheSnapshotMemorySize,
|
CacheFlushMemorySizeThreshold: opt.Config.CacheSnapshotMemorySize,
|
||||||
CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration),
|
CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration),
|
||||||
enableCompactionsOnOpen: true,
|
enableCompactionsOnOpen: true,
|
||||||
stats: &EngineStatistics{},
|
stats: &EngineStatistics{},
|
||||||
compactionLimiter: opt.CompactionLimiter,
|
loPriCompactionLimiter: opt.LoPriCompactionLimiter,
|
||||||
|
hiPriCompactionLimiter: opt.HiPriCompactionLimiter,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Attach fieldset to index.
|
// Attach fieldset to index.
|
||||||
|
@ -1289,6 +1293,10 @@ func (e *Engine) compactTSMFull(quit <-chan struct{}) {
|
||||||
// onFileStoreReplace is callback handler invoked when the FileStore
|
// onFileStoreReplace is callback handler invoked when the FileStore
|
||||||
// has replaced one set of TSM files with a new set.
|
// has replaced one set of TSM files with a new set.
|
||||||
func (e *Engine) onFileStoreReplace(newFiles []TSMFile) {
|
func (e *Engine) onFileStoreReplace(newFiles []TSMFile) {
|
||||||
|
if e.index.Type() == tsi1.IndexName {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Load any new series keys to the index
|
// Load any new series keys to the index
|
||||||
readers := make([]chan seriesKey, 0, len(newFiles))
|
readers := make([]chan seriesKey, 0, len(newFiles))
|
||||||
for _, r := range newFiles {
|
for _, r := range newFiles {
|
||||||
|
@ -1341,46 +1349,33 @@ func (e *Engine) onFileStoreReplace(newFiles []TSMFile) {
|
||||||
type compactionStrategy struct {
|
type compactionStrategy struct {
|
||||||
compactionGroups []CompactionGroup
|
compactionGroups []CompactionGroup
|
||||||
|
|
||||||
// concurrency determines how many compactions groups will be started
|
|
||||||
// concurrently. These groups may be limited by the global limiter if
|
|
||||||
// enabled.
|
|
||||||
concurrency int
|
|
||||||
fast bool
|
fast bool
|
||||||
description string
|
description string
|
||||||
|
level int
|
||||||
|
|
||||||
durationStat *int64
|
durationStat *int64
|
||||||
activeStat *int64
|
activeStat *int64
|
||||||
successStat *int64
|
successStat *int64
|
||||||
errorStat *int64
|
errorStat *int64
|
||||||
|
|
||||||
logger zap.Logger
|
logger zap.Logger
|
||||||
compactor *Compactor
|
compactor *Compactor
|
||||||
fileStore *FileStore
|
fileStore *FileStore
|
||||||
limiter limiter.Fixed
|
loPriLimiter limiter.Fixed
|
||||||
engine *Engine
|
hiPriLimiter limiter.Fixed
|
||||||
|
|
||||||
|
engine *Engine
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply concurrently compacts all the groups in a compaction strategy.
|
// Apply concurrently compacts all the groups in a compaction strategy.
|
||||||
func (s *compactionStrategy) Apply() {
|
func (s *compactionStrategy) Apply() {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
// cap concurrent compaction groups to no more than 4 at a time.
|
|
||||||
concurrency := s.concurrency
|
|
||||||
if concurrency == 0 {
|
|
||||||
concurrency = 4
|
|
||||||
}
|
|
||||||
|
|
||||||
throttle := limiter.NewFixed(concurrency)
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for i := range s.compactionGroups {
|
for i := range s.compactionGroups {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(groupNum int) {
|
go func(groupNum int) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
// limit concurrent compaction groups
|
|
||||||
throttle.Take()
|
|
||||||
defer throttle.Release()
|
|
||||||
|
|
||||||
s.compactGroup(groupNum)
|
s.compactGroup(groupNum)
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
|
@ -1391,10 +1386,31 @@ func (s *compactionStrategy) Apply() {
|
||||||
|
|
||||||
// compactGroup executes the compaction strategy against a single CompactionGroup.
|
// compactGroup executes the compaction strategy against a single CompactionGroup.
|
||||||
func (s *compactionStrategy) compactGroup(groupNum int) {
|
func (s *compactionStrategy) compactGroup(groupNum int) {
|
||||||
// Limit concurrent compactions if we have a limiter
|
// Level 1 and 2 are high priority and have a larger slice of the pool. If all
|
||||||
if cap(s.limiter) > 0 {
|
// the high priority capacity is used up, they can steal from the low priority
|
||||||
s.limiter.Take()
|
// pool as well if there is capacity. Otherwise, it wait on the high priority
|
||||||
defer s.limiter.Release()
|
// limiter until an running compaction completes. Level 3 and 4 are low priority
|
||||||
|
// as they are generally larger compactions and more expensive to run. They can
|
||||||
|
// steal a little from the high priority limiter if there is no high priority work.
|
||||||
|
switch s.level {
|
||||||
|
case 1, 2:
|
||||||
|
if s.hiPriLimiter.TryTake() {
|
||||||
|
defer s.hiPriLimiter.Release()
|
||||||
|
} else if s.loPriLimiter.TryTake() {
|
||||||
|
defer s.loPriLimiter.Release()
|
||||||
|
} else {
|
||||||
|
s.hiPriLimiter.Take()
|
||||||
|
defer s.hiPriLimiter.Release()
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
if s.loPriLimiter.TryTake() {
|
||||||
|
defer s.loPriLimiter.Release()
|
||||||
|
} else if s.hiPriLimiter.Idle() && s.hiPriLimiter.TryTake() {
|
||||||
|
defer s.hiPriLimiter.Release()
|
||||||
|
} else {
|
||||||
|
s.loPriLimiter.Take()
|
||||||
|
defer s.loPriLimiter.Release()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
group := s.compactionGroups[groupNum]
|
group := s.compactionGroups[groupNum]
|
||||||
|
@ -1457,14 +1473,15 @@ func (e *Engine) levelCompactionStrategy(fast bool, level int) *compactionStrate
|
||||||
}
|
}
|
||||||
|
|
||||||
return &compactionStrategy{
|
return &compactionStrategy{
|
||||||
concurrency: 4,
|
|
||||||
compactionGroups: compactionGroups,
|
compactionGroups: compactionGroups,
|
||||||
logger: e.logger,
|
logger: e.logger,
|
||||||
fileStore: e.FileStore,
|
fileStore: e.FileStore,
|
||||||
compactor: e.Compactor,
|
compactor: e.Compactor,
|
||||||
fast: fast,
|
fast: fast,
|
||||||
limiter: e.compactionLimiter,
|
loPriLimiter: e.loPriCompactionLimiter,
|
||||||
|
hiPriLimiter: e.hiPriCompactionLimiter,
|
||||||
engine: e,
|
engine: e,
|
||||||
|
level: level,
|
||||||
|
|
||||||
description: fmt.Sprintf("level %d", level),
|
description: fmt.Sprintf("level %d", level),
|
||||||
activeStat: &e.stats.TSMCompactionsActive[level-1],
|
activeStat: &e.stats.TSMCompactionsActive[level-1],
|
||||||
|
@ -1490,14 +1507,15 @@ func (e *Engine) fullCompactionStrategy() *compactionStrategy {
|
||||||
}
|
}
|
||||||
|
|
||||||
s := &compactionStrategy{
|
s := &compactionStrategy{
|
||||||
concurrency: 1,
|
|
||||||
compactionGroups: compactionGroups,
|
compactionGroups: compactionGroups,
|
||||||
logger: e.logger,
|
logger: e.logger,
|
||||||
fileStore: e.FileStore,
|
fileStore: e.FileStore,
|
||||||
compactor: e.Compactor,
|
compactor: e.Compactor,
|
||||||
fast: optimize,
|
fast: optimize,
|
||||||
limiter: e.compactionLimiter,
|
loPriLimiter: e.loPriCompactionLimiter,
|
||||||
|
hiPriLimiter: e.hiPriCompactionLimiter,
|
||||||
engine: e,
|
engine: e,
|
||||||
|
level: 4,
|
||||||
}
|
}
|
||||||
|
|
||||||
if optimize {
|
if optimize {
|
||||||
|
|
|
@ -881,9 +881,10 @@ func TestIndirectIndex_Entries(t *testing.T) {
|
||||||
index := NewIndexWriter()
|
index := NewIndexWriter()
|
||||||
index.Add([]byte("cpu"), BlockFloat64, 0, 1, 10, 100)
|
index.Add([]byte("cpu"), BlockFloat64, 0, 1, 10, 100)
|
||||||
index.Add([]byte("cpu"), BlockFloat64, 2, 3, 20, 200)
|
index.Add([]byte("cpu"), BlockFloat64, 2, 3, 20, 200)
|
||||||
index.Add([]byte("mem"), BlockFloat64, 0, 1, 10, 100)
|
|
||||||
exp := index.Entries([]byte("cpu"))
|
exp := index.Entries([]byte("cpu"))
|
||||||
|
|
||||||
|
index.Add([]byte("mem"), BlockFloat64, 0, 1, 10, 100)
|
||||||
|
|
||||||
b, err := index.MarshalBinary()
|
b, err := index.MarshalBinary()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error marshaling index: %v", err)
|
t.Fatalf("unexpected error marshaling index: %v", err)
|
||||||
|
@ -981,27 +982,16 @@ func TestIndirectIndex_Type(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIndirectIndex_Keys(t *testing.T) {
|
func TestDirectIndex_KeyCount(t *testing.T) {
|
||||||
index := NewIndexWriter()
|
index := NewIndexWriter()
|
||||||
index.Add([]byte("cpu"), BlockFloat64, 0, 1, 10, 20)
|
index.Add([]byte("cpu"), BlockFloat64, 0, 1, 10, 20)
|
||||||
index.Add([]byte("cpu"), BlockFloat64, 1, 2, 20, 30)
|
index.Add([]byte("cpu"), BlockFloat64, 1, 2, 20, 30)
|
||||||
index.Add([]byte("mem"), BlockFloat64, 0, 1, 10, 20)
|
index.Add([]byte("mem"), BlockFloat64, 0, 1, 10, 20)
|
||||||
|
|
||||||
keys := index.Keys()
|
|
||||||
|
|
||||||
// 2 distinct keys
|
// 2 distinct keys
|
||||||
if got, exp := len(keys), 2; got != exp {
|
if got, exp := index.KeyCount(), 2; got != exp {
|
||||||
t.Fatalf("length mismatch: got %v, exp %v", got, exp)
|
t.Fatalf("length mismatch: got %v, exp %v", got, exp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Keys should be sorted
|
|
||||||
if got, exp := string(keys[0]), "cpu"; got != exp {
|
|
||||||
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
|
|
||||||
}
|
|
||||||
|
|
||||||
if got, exp := string(keys[1]), "mem"; got != exp {
|
|
||||||
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBlockIterator_Single(t *testing.T) {
|
func TestBlockIterator_Single(t *testing.T) {
|
||||||
|
|
|
@ -32,19 +32,15 @@ const partitions = 4096
|
||||||
// key is hashed and the first 8 bits are used as an index to the ring.
|
// key is hashed and the first 8 bits are used as an index to the ring.
|
||||||
//
|
//
|
||||||
type ring struct {
|
type ring struct {
|
||||||
// The unique set of partitions in the ring.
|
|
||||||
// len(partitions) <= len(continuum)
|
|
||||||
partitions []*partition
|
|
||||||
|
|
||||||
// A mapping of partition to location on the ring continuum. This is used
|
|
||||||
// to lookup a partition.
|
|
||||||
continuum []*partition
|
|
||||||
|
|
||||||
// Number of keys within the ring. This is used to provide a hint for
|
// Number of keys within the ring. This is used to provide a hint for
|
||||||
// allocating the return values in keys(). It will not be perfectly accurate
|
// allocating the return values in keys(). It will not be perfectly accurate
|
||||||
// since it doesn't consider adding duplicate keys, or trying to remove non-
|
// since it doesn't consider adding duplicate keys, or trying to remove non-
|
||||||
// existent keys.
|
// existent keys.
|
||||||
keysHint int64
|
keysHint int64
|
||||||
|
|
||||||
|
// The unique set of partitions in the ring.
|
||||||
|
// len(partitions) <= len(continuum)
|
||||||
|
partitions []*partition
|
||||||
}
|
}
|
||||||
|
|
||||||
// newring returns a new ring initialised with n partitions. n must always be a
|
// newring returns a new ring initialised with n partitions. n must always be a
|
||||||
|
@ -59,20 +55,16 @@ func newring(n int) (*ring, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
r := ring{
|
r := ring{
|
||||||
continuum: make([]*partition, partitions), // maximum number of partitions.
|
partitions: make([]*partition, n), // maximum number of partitions.
|
||||||
}
|
}
|
||||||
|
|
||||||
// The trick here is to map N partitions to all points on the continuum,
|
// The trick here is to map N partitions to all points on the continuum,
|
||||||
// such that the first eight bits of a given hash will map directly to one
|
// such that the first eight bits of a given hash will map directly to one
|
||||||
// of the N partitions.
|
// of the N partitions.
|
||||||
for i := 0; i < len(r.continuum); i++ {
|
for i := 0; i < len(r.partitions); i++ {
|
||||||
if (i == 0 || i%(partitions/n) == 0) && len(r.partitions) < n {
|
r.partitions[i] = &partition{
|
||||||
r.partitions = append(r.partitions, &partition{
|
store: make(map[string]*entry),
|
||||||
store: make(map[string]*entry),
|
|
||||||
entrySizeHints: make(map[uint64]int),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
r.continuum[i] = r.partitions[len(r.partitions)-1]
|
|
||||||
}
|
}
|
||||||
return &r, nil
|
return &r, nil
|
||||||
}
|
}
|
||||||
|
@ -92,7 +84,7 @@ func (r *ring) reset() {
|
||||||
// getPartition retrieves the hash ring partition associated with the provided
|
// getPartition retrieves the hash ring partition associated with the provided
|
||||||
// key.
|
// key.
|
||||||
func (r *ring) getPartition(key []byte) *partition {
|
func (r *ring) getPartition(key []byte) *partition {
|
||||||
return r.continuum[int(xxhash.Sum64(key)%partitions)]
|
return r.partitions[int(xxhash.Sum64(key)%partitions)]
|
||||||
}
|
}
|
||||||
|
|
||||||
// entry returns the entry for the given key.
|
// entry returns the entry for the given key.
|
||||||
|
@ -137,6 +129,14 @@ func (r *ring) keys(sorted bool) [][]byte {
|
||||||
return keys
|
return keys
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *ring) count() int {
|
||||||
|
var n int
|
||||||
|
for _, p := range r.partitions {
|
||||||
|
n += p.count()
|
||||||
|
}
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
// apply applies the provided function to every entry in the ring under a read
|
// apply applies the provided function to every entry in the ring under a read
|
||||||
// lock using a separate goroutine for each partition. The provided function
|
// lock using a separate goroutine for each partition. The provided function
|
||||||
// will be called with each key and the corresponding entry. The first error
|
// will be called with each key and the corresponding entry. The first error
|
||||||
|
@ -188,6 +188,9 @@ func (r *ring) applySerial(f func([]byte, *entry) error) error {
|
||||||
for _, p := range r.partitions {
|
for _, p := range r.partitions {
|
||||||
p.mu.RLock()
|
p.mu.RLock()
|
||||||
for k, e := range p.store {
|
for k, e := range p.store {
|
||||||
|
if e.count() == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if err := f([]byte(k), e); err != nil {
|
if err := f([]byte(k), e); err != nil {
|
||||||
p.mu.RUnlock()
|
p.mu.RUnlock()
|
||||||
return err
|
return err
|
||||||
|
@ -198,16 +201,25 @@ func (r *ring) applySerial(f func([]byte, *entry) error) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *ring) split(n int) []storer {
|
||||||
|
var keys int
|
||||||
|
storers := make([]storer, n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
storers[i], _ = newring(len(r.partitions))
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, p := range r.partitions {
|
||||||
|
r := storers[i%n].(*ring)
|
||||||
|
r.partitions[i] = p
|
||||||
|
keys += len(p.store)
|
||||||
|
}
|
||||||
|
return storers
|
||||||
|
}
|
||||||
|
|
||||||
// partition provides safe access to a map of series keys to entries.
|
// partition provides safe access to a map of series keys to entries.
|
||||||
type partition struct {
|
type partition struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
store map[string]*entry
|
store map[string]*entry
|
||||||
|
|
||||||
// entrySizeHints stores hints for appropriate sizes to pre-allocate the
|
|
||||||
// []Values in an entry. entrySizeHints will only contain hints for entries
|
|
||||||
// that were present prior to the most recent snapshot, preventing unbounded
|
|
||||||
// growth over time.
|
|
||||||
entrySizeHints map[uint64]int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// entry returns the partition's entry for the provided key.
|
// entry returns the partition's entry for the provided key.
|
||||||
|
@ -240,8 +252,7 @@ func (p *partition) write(key []byte, values Values) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new entry using a preallocated size if we have a hint available.
|
// Create a new entry using a preallocated size if we have a hint available.
|
||||||
hint, _ := p.entrySizeHints[xxhash.Sum64(key)]
|
e, err := newEntryValues(values, 32)
|
||||||
e, err := newEntryValues(values, hint)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
@ -269,7 +280,10 @@ func (p *partition) remove(key []byte) {
|
||||||
func (p *partition) keys() [][]byte {
|
func (p *partition) keys() [][]byte {
|
||||||
p.mu.RLock()
|
p.mu.RLock()
|
||||||
keys := make([][]byte, 0, len(p.store))
|
keys := make([][]byte, 0, len(p.store))
|
||||||
for k := range p.store {
|
for k, v := range p.store {
|
||||||
|
if v.count() == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
keys = append(keys, []byte(k))
|
keys = append(keys, []byte(k))
|
||||||
}
|
}
|
||||||
p.mu.RUnlock()
|
p.mu.RUnlock()
|
||||||
|
@ -279,21 +293,25 @@ func (p *partition) keys() [][]byte {
|
||||||
// reset resets the partition by reinitialising the store. reset returns hints
|
// reset resets the partition by reinitialising the store. reset returns hints
|
||||||
// about sizes that the entries within the store could be reallocated with.
|
// about sizes that the entries within the store could be reallocated with.
|
||||||
func (p *partition) reset() {
|
func (p *partition) reset() {
|
||||||
|
p.mu.RLock()
|
||||||
|
sz := len(p.store)
|
||||||
|
p.mu.RUnlock()
|
||||||
|
|
||||||
|
newStore := make(map[string]*entry, sz)
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
p.store = newStore
|
||||||
|
p.mu.Unlock()
|
||||||
// Collect the allocated sizes of values for each entry in the store.
|
}
|
||||||
p.entrySizeHints = make(map[uint64]int)
|
|
||||||
for k, entry := range p.store {
|
func (p *partition) count() int {
|
||||||
// If the capacity is large then there are many values in the entry.
|
var n int
|
||||||
// Store a hint to pre-allocate the next time we see the same entry.
|
p.mu.RLock()
|
||||||
entry.mu.RLock()
|
for _, v := range p.store {
|
||||||
if cap(entry.values) > 128 { // 4 x the default entry capacity size.
|
if v.count() > 0 {
|
||||||
p.entrySizeHints[xxhash.Sum64String(k)] = cap(entry.values)
|
n++
|
||||||
}
|
}
|
||||||
entry.mu.RUnlock()
|
}
|
||||||
}
|
p.mu.RUnlock()
|
||||||
|
return n
|
||||||
// Reset the store.
|
|
||||||
p.store = make(map[string]*entry, len(p.store))
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,7 @@ func TestRing_newRing(t *testing.T) {
|
||||||
|
|
||||||
// Check partitions distributed correctly
|
// Check partitions distributed correctly
|
||||||
partitions := make([]*partition, 0)
|
partitions := make([]*partition, 0)
|
||||||
for i, partition := range r.continuum {
|
for i, partition := range r.partitions {
|
||||||
if i == 0 || partition != partitions[len(partitions)-1] {
|
if i == 0 || partition != partitions[len(partitions)-1] {
|
||||||
partitions = append(partitions, partition)
|
partitions = append(partitions, partition)
|
||||||
}
|
}
|
||||||
|
|
|
@ -152,9 +152,6 @@ type IndexWriter interface {
|
||||||
// Entries returns all index entries for a key.
|
// Entries returns all index entries for a key.
|
||||||
Entries(key []byte) []IndexEntry
|
Entries(key []byte) []IndexEntry
|
||||||
|
|
||||||
// Keys returns the unique set of keys in the index.
|
|
||||||
Keys() [][]byte
|
|
||||||
|
|
||||||
// KeyCount returns the count of unique keys in the index.
|
// KeyCount returns the count of unique keys in the index.
|
||||||
KeyCount() int
|
KeyCount() int
|
||||||
|
|
||||||
|
@ -232,12 +229,13 @@ func (e *IndexEntry) String() string {
|
||||||
|
|
||||||
// NewIndexWriter returns a new IndexWriter.
|
// NewIndexWriter returns a new IndexWriter.
|
||||||
func NewIndexWriter() IndexWriter {
|
func NewIndexWriter() IndexWriter {
|
||||||
return &directIndex{}
|
buf := bytes.NewBuffer(make([]byte, 0, 4096))
|
||||||
|
return &directIndex{buf: buf, w: bufio.NewWriter(buf)}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewIndexWriter returns a new IndexWriter.
|
// NewIndexWriter returns a new IndexWriter.
|
||||||
func NewDiskIndexWriter(f *os.File) IndexWriter {
|
func NewDiskIndexWriter(f *os.File) IndexWriter {
|
||||||
return &directIndex{fd: f, w: bufio.NewWriter(f)}
|
return &directIndex{fd: f, w: bufio.NewWriterSize(f, 1024*1024)}
|
||||||
}
|
}
|
||||||
|
|
||||||
// indexBlock represent an index information for a series within a TSM file.
|
// indexBlock represent an index information for a series within a TSM file.
|
||||||
|
@ -249,43 +247,48 @@ type indexBlock struct {
|
||||||
// directIndex is a simple in-memory index implementation for a TSM file. The full index
|
// directIndex is a simple in-memory index implementation for a TSM file. The full index
|
||||||
// must fit in memory.
|
// must fit in memory.
|
||||||
type directIndex struct {
|
type directIndex struct {
|
||||||
size uint32
|
keyCount int
|
||||||
blocks []indexBlock
|
size uint32
|
||||||
fd *os.File
|
fd *os.File
|
||||||
w *bufio.Writer
|
buf *bytes.Buffer
|
||||||
|
|
||||||
|
w *bufio.Writer
|
||||||
|
|
||||||
|
key []byte
|
||||||
|
indexEntries *indexEntries
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *directIndex) Add(key []byte, blockType byte, minTime, maxTime int64, offset int64, size uint32) {
|
func (d *directIndex) Add(key []byte, blockType byte, minTime, maxTime int64, offset int64, size uint32) {
|
||||||
// Is this the first block being added?
|
// Is this the first block being added?
|
||||||
if len(d.blocks) == 0 {
|
if len(d.key) == 0 {
|
||||||
// size of the key stored in the index
|
// size of the key stored in the index
|
||||||
d.size += uint32(2 + len(key))
|
d.size += uint32(2 + len(key))
|
||||||
// size of the count of entries stored in the index
|
// size of the count of entries stored in the index
|
||||||
d.size += indexCountSize
|
d.size += indexCountSize
|
||||||
|
|
||||||
d.blocks = append(d.blocks, indexBlock{
|
d.key = key
|
||||||
key: key,
|
if d.indexEntries == nil {
|
||||||
entries: &indexEntries{
|
d.indexEntries = &indexEntries{}
|
||||||
Type: blockType,
|
}
|
||||||
entries: []IndexEntry{IndexEntry{
|
d.indexEntries.Type = blockType
|
||||||
MinTime: minTime,
|
d.indexEntries.entries = append(d.indexEntries.entries, IndexEntry{
|
||||||
MaxTime: maxTime,
|
MinTime: minTime,
|
||||||
Offset: offset,
|
MaxTime: maxTime,
|
||||||
Size: size,
|
Offset: offset,
|
||||||
}}},
|
Size: size,
|
||||||
})
|
})
|
||||||
|
|
||||||
// size of the encoded index entry
|
// size of the encoded index entry
|
||||||
d.size += indexEntrySize
|
d.size += indexEntrySize
|
||||||
|
d.keyCount++
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find the last block so we can see if were still adding to the same series key.
|
// See if were still adding to the same series key.
|
||||||
block := d.blocks[len(d.blocks)-1]
|
cmp := bytes.Compare(d.key, key)
|
||||||
cmp := bytes.Compare(block.key, key)
|
|
||||||
if cmp == 0 {
|
if cmp == 0 {
|
||||||
// The last block is still this key
|
// The last block is still this key
|
||||||
block.entries.entries = append(block.entries.entries, IndexEntry{
|
d.indexEntries.entries = append(d.indexEntries.entries, IndexEntry{
|
||||||
MinTime: minTime,
|
MinTime: minTime,
|
||||||
MaxTime: maxTime,
|
MaxTime: maxTime,
|
||||||
Offset: offset,
|
Offset: offset,
|
||||||
|
@ -296,9 +299,7 @@ func (d *directIndex) Add(key []byte, blockType byte, minTime, maxTime int64, of
|
||||||
d.size += indexEntrySize
|
d.size += indexEntrySize
|
||||||
|
|
||||||
} else if cmp < 0 {
|
} else if cmp < 0 {
|
||||||
if d.w != nil {
|
d.flush(d.w)
|
||||||
d.flush(d.w)
|
|
||||||
}
|
|
||||||
// We have a new key that is greater than the last one so we need to add
|
// We have a new key that is greater than the last one so we need to add
|
||||||
// a new index block section.
|
// a new index block section.
|
||||||
|
|
||||||
|
@ -307,38 +308,31 @@ func (d *directIndex) Add(key []byte, blockType byte, minTime, maxTime int64, of
|
||||||
// size of the count of entries stored in the index
|
// size of the count of entries stored in the index
|
||||||
d.size += indexCountSize
|
d.size += indexCountSize
|
||||||
|
|
||||||
d.blocks = append(d.blocks, indexBlock{
|
d.key = key
|
||||||
key: key,
|
d.indexEntries.Type = blockType
|
||||||
entries: &indexEntries{
|
d.indexEntries.entries = append(d.indexEntries.entries, IndexEntry{
|
||||||
Type: blockType,
|
MinTime: minTime,
|
||||||
entries: []IndexEntry{IndexEntry{
|
MaxTime: maxTime,
|
||||||
MinTime: minTime,
|
Offset: offset,
|
||||||
MaxTime: maxTime,
|
Size: size,
|
||||||
Offset: offset,
|
|
||||||
Size: size,
|
|
||||||
}}},
|
|
||||||
})
|
})
|
||||||
|
|
||||||
// size of the encoded index entry
|
// size of the encoded index entry
|
||||||
d.size += indexEntrySize
|
d.size += indexEntrySize
|
||||||
|
d.keyCount++
|
||||||
} else {
|
} else {
|
||||||
// Keys can't be added out of order.
|
// Keys can't be added out of order.
|
||||||
panic(fmt.Sprintf("keys must be added in sorted order: %s < %s", string(key), string(d.blocks[len(d.blocks)-1].key)))
|
panic(fmt.Sprintf("keys must be added in sorted order: %s < %s", string(key), string(d.key)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *directIndex) entries(key []byte) []IndexEntry {
|
func (d *directIndex) entries(key []byte) []IndexEntry {
|
||||||
if len(d.blocks) == 0 {
|
if len(d.key) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if bytes.Equal(d.blocks[len(d.blocks)-1].key, key) {
|
if bytes.Equal(d.key, key) {
|
||||||
return d.blocks[len(d.blocks)-1].entries.entries
|
return d.indexEntries.entries
|
||||||
}
|
|
||||||
|
|
||||||
i := sort.Search(len(d.blocks), func(i int) bool { return bytes.Compare(d.blocks[i].key, key) >= 0 })
|
|
||||||
if i < len(d.blocks) && bytes.Equal(d.blocks[i].key, key) {
|
|
||||||
return d.blocks[i].entries.entries
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -358,23 +352,11 @@ func (d *directIndex) Entry(key []byte, t int64) *IndexEntry {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *directIndex) Keys() [][]byte {
|
|
||||||
keys := make([][]byte, 0, len(d.blocks))
|
|
||||||
for _, v := range d.blocks {
|
|
||||||
keys = append(keys, v.key)
|
|
||||||
}
|
|
||||||
return keys
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *directIndex) KeyCount() int {
|
func (d *directIndex) KeyCount() int {
|
||||||
return len(d.blocks)
|
return d.keyCount
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
|
func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
|
||||||
if d.w == nil {
|
|
||||||
return d.flush(w)
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := d.flush(d.w); err != nil {
|
if _, err := d.flush(d.w); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -383,6 +365,10 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if d.fd == nil {
|
||||||
|
return io.Copy(w, d.buf)
|
||||||
|
}
|
||||||
|
|
||||||
if _, err := d.fd.Seek(0, io.SeekStart); err != nil {
|
if _, err := d.fd.Seek(0, io.SeekStart); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -398,50 +384,52 @@ func (d *directIndex) flush(w io.Writer) (int64, error) {
|
||||||
N int64
|
N int64
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if len(d.key) == 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
// For each key, individual entries are sorted by time
|
// For each key, individual entries are sorted by time
|
||||||
for _, ie := range d.blocks {
|
key := d.key
|
||||||
key := ie.key
|
entries := d.indexEntries
|
||||||
entries := ie.entries
|
|
||||||
|
|
||||||
if entries.Len() > maxIndexEntries {
|
|
||||||
return N, fmt.Errorf("key '%s' exceeds max index entries: %d > %d", key, entries.Len(), maxIndexEntries)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !sort.IsSorted(entries) {
|
|
||||||
sort.Sort(entries)
|
|
||||||
}
|
|
||||||
|
|
||||||
binary.BigEndian.PutUint16(buf[0:2], uint16(len(key)))
|
|
||||||
buf[2] = entries.Type
|
|
||||||
binary.BigEndian.PutUint16(buf[3:5], uint16(entries.Len()))
|
|
||||||
|
|
||||||
// Append the key length and key
|
|
||||||
if n, err = w.Write(buf[0:2]); err != nil {
|
|
||||||
return int64(n) + N, fmt.Errorf("write: writer key length error: %v", err)
|
|
||||||
}
|
|
||||||
N += int64(n)
|
|
||||||
|
|
||||||
if n, err = w.Write(key); err != nil {
|
|
||||||
return int64(n) + N, fmt.Errorf("write: writer key error: %v", err)
|
|
||||||
}
|
|
||||||
N += int64(n)
|
|
||||||
|
|
||||||
// Append the block type and count
|
|
||||||
if n, err = w.Write(buf[2:5]); err != nil {
|
|
||||||
return int64(n) + N, fmt.Errorf("write: writer block type and count error: %v", err)
|
|
||||||
}
|
|
||||||
N += int64(n)
|
|
||||||
|
|
||||||
// Append each index entry for all blocks for this key
|
|
||||||
var n64 int64
|
|
||||||
if n64, err = entries.WriteTo(w); err != nil {
|
|
||||||
return n64 + N, fmt.Errorf("write: writer entries error: %v", err)
|
|
||||||
}
|
|
||||||
N += n64
|
|
||||||
|
|
||||||
|
if entries.Len() > maxIndexEntries {
|
||||||
|
return N, fmt.Errorf("key '%s' exceeds max index entries: %d > %d", key, entries.Len(), maxIndexEntries)
|
||||||
}
|
}
|
||||||
|
|
||||||
d.blocks = d.blocks[:0]
|
if !sort.IsSorted(entries) {
|
||||||
|
sort.Sort(entries)
|
||||||
|
}
|
||||||
|
|
||||||
|
binary.BigEndian.PutUint16(buf[0:2], uint16(len(key)))
|
||||||
|
buf[2] = entries.Type
|
||||||
|
binary.BigEndian.PutUint16(buf[3:5], uint16(entries.Len()))
|
||||||
|
|
||||||
|
// Append the key length and key
|
||||||
|
if n, err = w.Write(buf[0:2]); err != nil {
|
||||||
|
return int64(n) + N, fmt.Errorf("write: writer key length error: %v", err)
|
||||||
|
}
|
||||||
|
N += int64(n)
|
||||||
|
|
||||||
|
if n, err = w.Write(key); err != nil {
|
||||||
|
return int64(n) + N, fmt.Errorf("write: writer key error: %v", err)
|
||||||
|
}
|
||||||
|
N += int64(n)
|
||||||
|
|
||||||
|
// Append the block type and count
|
||||||
|
if n, err = w.Write(buf[2:5]); err != nil {
|
||||||
|
return int64(n) + N, fmt.Errorf("write: writer block type and count error: %v", err)
|
||||||
|
}
|
||||||
|
N += int64(n)
|
||||||
|
|
||||||
|
// Append each index entry for all blocks for this key
|
||||||
|
var n64 int64
|
||||||
|
if n64, err = entries.WriteTo(w); err != nil {
|
||||||
|
return n64 + N, fmt.Errorf("write: writer entries error: %v", err)
|
||||||
|
}
|
||||||
|
N += n64
|
||||||
|
|
||||||
|
d.key = nil
|
||||||
|
d.indexEntries.Type = 0
|
||||||
|
d.indexEntries.entries = d.indexEntries.entries[:0]
|
||||||
|
|
||||||
return N, nil
|
return N, nil
|
||||||
|
|
||||||
|
@ -460,14 +448,15 @@ func (d *directIndex) Size() uint32 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *directIndex) Close() error {
|
func (d *directIndex) Close() error {
|
||||||
if d.w == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Flush anything remaining in the index
|
// Flush anything remaining in the index
|
||||||
if err := d.w.Flush(); err != nil {
|
if err := d.w.Flush(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if d.fd == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if err := d.fd.Close(); err != nil {
|
if err := d.fd.Close(); err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -159,15 +159,35 @@ func (s *Store) loadShards() error {
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
t := limiter.NewFixed(runtime.GOMAXPROCS(0))
|
|
||||||
|
|
||||||
// Setup a shared limiter for compactions
|
// Setup a shared limiter for compactions
|
||||||
lim := s.EngineOptions.Config.MaxConcurrentCompactions
|
lim := s.EngineOptions.Config.MaxConcurrentCompactions
|
||||||
if lim == 0 {
|
if lim == 0 {
|
||||||
|
lim = runtime.GOMAXPROCS(0) / 2 // Default to 50% of cores for compactions
|
||||||
|
if lim < 1 {
|
||||||
|
lim = 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Don't allow more compactions to run than cores.
|
||||||
|
if lim > runtime.GOMAXPROCS(0) {
|
||||||
lim = runtime.GOMAXPROCS(0)
|
lim = runtime.GOMAXPROCS(0)
|
||||||
}
|
}
|
||||||
s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim)
|
|
||||||
|
|
||||||
|
// If only one compacttion can run at time, use the same limiter for high and low
|
||||||
|
// priority work.
|
||||||
|
if lim == 1 {
|
||||||
|
s.EngineOptions.HiPriCompactionLimiter = limiter.NewFixed(1)
|
||||||
|
s.EngineOptions.LoPriCompactionLimiter = s.EngineOptions.HiPriCompactionLimiter
|
||||||
|
} else {
|
||||||
|
// Split the available high and low priority limiters between the available cores.
|
||||||
|
// The high priority work can steal from low priority at times so it can use the
|
||||||
|
// full limit if there is pending work. The low priority is capped at half the
|
||||||
|
// limit.
|
||||||
|
s.EngineOptions.HiPriCompactionLimiter = limiter.NewFixed(lim/2 + lim%2)
|
||||||
|
s.EngineOptions.LoPriCompactionLimiter = limiter.NewFixed(lim / 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
t := limiter.NewFixed(runtime.GOMAXPROCS(0))
|
||||||
resC := make(chan *res)
|
resC := make(chan *res)
|
||||||
var n int
|
var n int
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue