* fix(influxd): update xxhash, avoid stringtoslicebyte in cache (#578)
* fix(influxd): update xxhash, avoid stringtoslicebyte in cache
This commit does 3 things:
* it updates xxhash from v1 to v2; v2 includes a assembly arm version of
Sum64
* it changes the cache storer to write with a string key instead of a
byte slice. The cache only reads the key which WriteMulti already has
as a string so we can avoid a host of allocations when converting back
and forth from immutable strings to mutable byte slices. This includes
updating the cache ring and ring partition to write with a string key
* it updates the xxhash for finding the cache ring partition to use
Sum64String which uses unsafe pointers to directly use a string as a
byte slice since it only reads the string. Note: this now uses an
assembly version because of the v2 xxhash update. Go 1.22 included new
compiler ability to recognize calls of Method([]byte(myString)) and not
make a copy but from looking at the call sites, I'm not sure the
compiler would recognize it as the conversion to a byte slice was
happening several calls earlier.
That's what this change set does. If we are uncomfortable with any of
these, we can do fewer of them (for example, not upgrade xxhash; and/or
not use the specialized Sum64String, etc).
For the performance issue in maz-rr, I see converting string keys to
byte slices taking between 3-5% of cpu usage on both the primary and
secondary. So while this pr doesn't address directly the increased cpu
usage on the secondary, it makes cpu usage less on both which still
feels like a win. I believe these changes are easier to review that
switching to a byte slice pool that is likely needed in other places as
the compiler provides nearly all of the correctness checks we need (we
are relying also on xxhash v2 being correct).
* helps #550
* chore: fix tests/lint
* chore: don't use assembly version; should inline
This 2 line change causes xxhash to use a purego Sum64 implementation
which allows the compiler to see that Sum64 only read the byte slice
input which them means is can skip the string to byte slice allocation
and since it can skip that, it should inline all the calls to
getPartitionStringKey and Sum64 avoiding 1 call to Sum64String which
isn't inlined.
* chore: update ci build file
the ci build doesn't use the make file!!!
* chore: revert "chore: update ci build file"
This reverts commit 94be66fde03e0bbe18004aab25c0e19051406de2.
* chore: revert "chore: don't use assembly version; should inline"
This reverts commit 67d8d06c02e17e91ba643a2991e30a49308a5283.
(cherry picked from commit 1d334c679ca025645ed93518b7832ae676499cd2)
* feat: need to update go sum
---------
Co-authored-by: Phil Bracikowski <13472206+philjb@users.noreply.github.com>
(cherry picked from commit 06ab224516
)
db/cherrypick-06ab224
parent
eea87ba94c
commit
9e022e5d12
3
go.mod
3
go.mod
|
@ -8,7 +8,7 @@ require (
|
|||
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40
|
||||
github.com/benbjohnson/tmpl v1.0.0
|
||||
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40
|
||||
github.com/cespare/xxhash v1.1.0
|
||||
github.com/cespare/xxhash/v2 v2.3.0
|
||||
github.com/davecgh/go-spew v1.1.1
|
||||
github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8
|
||||
github.com/go-chi/chi v4.1.0+incompatible
|
||||
|
@ -98,7 +98,6 @@ require (
|
|||
github.com/benbjohnson/immutable v0.3.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/bonitoo-io/go-sql-bigquery v0.3.4-1.4.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/deepmap/oapi-codegen v1.6.0 // indirect
|
||||
github.com/denisenkom/go-mssqldb v0.10.0 // indirect
|
||||
github.com/dimchansky/utfbom v1.1.0 // indirect
|
||||
|
|
7
go.sum
7
go.sum
|
@ -118,7 +118,6 @@ github.com/Masterminds/semver v1.4.2/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF0
|
|||
github.com/Masterminds/sprig v2.16.0+incompatible h1:QZbMUPxRQ50EKAq3LFMnxddMu88/EUUG3qmxwtDmPsY=
|
||||
github.com/Masterminds/sprig v2.16.0+incompatible/go.mod h1:y6hNFY5UBTIWBxnzTeuNhlNS5hqE0NB0E6fgfo2Br3o=
|
||||
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
|
||||
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
|
||||
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
||||
github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
|
||||
github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
|
||||
|
@ -228,12 +227,11 @@ github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n
|
|||
github.com/cenkalti/backoff v0.0.0-20181003080854-62661b46c409/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
|
||||
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||
github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tjxl5dIMyVM=
|
||||
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
|
||||
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
|
||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
|
||||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
||||
|
@ -941,7 +939,6 @@ github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIK
|
|||
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
|
||||
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
|
||||
github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
|
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
|
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
|
||||
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
|
||||
|
|
|
@ -12,7 +12,7 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/cespare/xxhash"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
// Filter represents a bloom filter.
|
||||
|
|
|
@ -22,7 +22,7 @@ import (
|
|||
"sort"
|
||||
"unsafe"
|
||||
|
||||
"github.com/cespare/xxhash"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
"github.com/influxdata/influxdb/pkg/estimator"
|
||||
)
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ import (
|
|||
"encoding/binary"
|
||||
"sort"
|
||||
|
||||
"github.com/cespare/xxhash"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
// HashMap represents a hash map that implements Robin Hood Hashing.
|
||||
|
|
|
@ -165,7 +165,7 @@ const (
|
|||
// storer is the interface that descibes a cache's store.
|
||||
type storer interface {
|
||||
entry(key []byte) *entry // Get an entry by its key.
|
||||
write(key []byte, values Values) (bool, error) // Write an entry to the store.
|
||||
write(key string, values Values) (bool, error) // Write an entry to the store.
|
||||
remove(key []byte) // Remove an entry from the store.
|
||||
keys(sorted bool) [][]byte // Return an optionally sorted slice of entry keys.
|
||||
apply(f func([]byte, *entry) error) error // Apply f to all entries in the store in parallel.
|
||||
|
@ -294,7 +294,7 @@ func (c *Cache) Write(key []byte, values []Value) error {
|
|||
return ErrCacheMemorySizeLimitExceeded(n, limit)
|
||||
}
|
||||
|
||||
newKey, err := c.store.write(key, values)
|
||||
newKey, err := c.store.write(string(key), values)
|
||||
if err != nil {
|
||||
atomic.AddInt64(&c.stats.WriteErr, 1)
|
||||
return err
|
||||
|
@ -339,7 +339,7 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
|
|||
// We'll optimistially set size here, and then decrement it for write errors.
|
||||
c.increaseSize(addedSize)
|
||||
for k, v := range values {
|
||||
newKey, err := store.write([]byte(k), v)
|
||||
newKey, err := store.write(k, v)
|
||||
if err != nil {
|
||||
// The write failed, hold onto the error and adjust the size delta.
|
||||
werr = err
|
||||
|
@ -820,7 +820,7 @@ func (c *Cache) updateSnapshots() {
|
|||
type emptyStore struct{}
|
||||
|
||||
func (e emptyStore) entry(key []byte) *entry { return nil }
|
||||
func (e emptyStore) write(key []byte, values Values) (bool, error) { return false, nil }
|
||||
func (e emptyStore) write(key string, values Values) (bool, error) { return false, nil }
|
||||
func (e emptyStore) remove(key []byte) {}
|
||||
func (e emptyStore) keys(sorted bool) [][]byte { return nil }
|
||||
func (e emptyStore) apply(f func([]byte, *entry) error) error { return nil }
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
|
@ -119,8 +118,8 @@ func TestCache_WriteMulti_Stats(t *testing.T) {
|
|||
c.init()
|
||||
c.store = ms
|
||||
|
||||
ms.writef = func(key []byte, v Values) (bool, error) {
|
||||
if bytes.Equal(key, []byte("foo")) {
|
||||
ms.writef = func(key string, v Values) (bool, error) {
|
||||
if key == "foo" {
|
||||
return false, errors.New("write failed")
|
||||
}
|
||||
return true, nil
|
||||
|
@ -871,7 +870,7 @@ func mustMarshalEntry(entry WALEntry) (WalEntryType, []byte) {
|
|||
// Cache's storer implememation.
|
||||
type TestStore struct {
|
||||
entryf func(key []byte) *entry
|
||||
writef func(key []byte, values Values) (bool, error)
|
||||
writef func(key string, values Values) (bool, error)
|
||||
removef func(key []byte)
|
||||
keysf func(sorted bool) [][]byte
|
||||
applyf func(f func([]byte, *entry) error) error
|
||||
|
@ -883,7 +882,7 @@ type TestStore struct {
|
|||
|
||||
func NewTestStore() *TestStore { return &TestStore{} }
|
||||
func (s *TestStore) entry(key []byte) *entry { return s.entryf(key) }
|
||||
func (s *TestStore) write(key []byte, values Values) (bool, error) { return s.writef(key, values) }
|
||||
func (s *TestStore) write(key string, values Values) (bool, error) { return s.writef(key, values) }
|
||||
func (s *TestStore) remove(key []byte) { s.removef(key) }
|
||||
func (s *TestStore) keys(sorted bool) [][]byte { return s.keysf(sorted) }
|
||||
func (s *TestStore) apply(f func([]byte, *entry) error) error { return s.applyf(f) }
|
||||
|
|
|
@ -4,7 +4,7 @@ import (
|
|||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/cespare/xxhash"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
"github.com/influxdata/influxdb/pkg/bytesutil"
|
||||
)
|
||||
|
||||
|
@ -80,6 +80,12 @@ func (r *ring) getPartition(key []byte) *partition {
|
|||
return r.partitions[int(xxhash.Sum64(key)%uint64(len(r.partitions)))]
|
||||
}
|
||||
|
||||
// getPartition retrieves the hash ring partition associated with the provided
|
||||
// key, as a string, which can be faster if you already have a string as this is read only
|
||||
func (r *ring) getPartitionStringKey(key string) *partition {
|
||||
return r.partitions[int(xxhash.Sum64String(key)%uint64(len(r.partitions)))]
|
||||
}
|
||||
|
||||
// entry returns the entry for the given key.
|
||||
// entry is safe for use by multiple goroutines.
|
||||
func (r *ring) entry(key []byte) *entry {
|
||||
|
@ -89,8 +95,8 @@ func (r *ring) entry(key []byte) *entry {
|
|||
// write writes values to the entry in the ring's partition associated with key.
|
||||
// If no entry exists for the key then one will be created.
|
||||
// write is safe for use by multiple goroutines.
|
||||
func (r *ring) write(key []byte, values Values) (bool, error) {
|
||||
return r.getPartition(key).write(key, values)
|
||||
func (r *ring) write(key string, values Values) (bool, error) {
|
||||
return r.getPartitionStringKey(key).write(key, values)
|
||||
}
|
||||
|
||||
// remove deletes the entry for the given key.
|
||||
|
@ -218,9 +224,9 @@ func (p *partition) entry(key []byte) *entry {
|
|||
// write writes the values to the entry in the partition, creating the entry
|
||||
// if it does not exist.
|
||||
// write is safe for use by multiple goroutines.
|
||||
func (p *partition) write(key []byte, values Values) (bool, error) {
|
||||
func (p *partition) write(key string, values Values) (bool, error) {
|
||||
p.mu.RLock()
|
||||
e := p.store[string(key)]
|
||||
e := p.store[key]
|
||||
p.mu.RUnlock()
|
||||
if e != nil {
|
||||
// Hot path.
|
||||
|
@ -231,7 +237,7 @@ func (p *partition) write(key []byte, values Values) (bool, error) {
|
|||
defer p.mu.Unlock()
|
||||
|
||||
// Check again.
|
||||
if e = p.store[string(key)]; e != nil {
|
||||
if e = p.store[key]; e != nil {
|
||||
return false, e.add(values)
|
||||
}
|
||||
|
||||
|
@ -241,7 +247,7 @@ func (p *partition) write(key []byte, values Values) (bool, error) {
|
|||
return false, err
|
||||
}
|
||||
|
||||
p.store[string(key)] = e
|
||||
p.store[key] = e
|
||||
return true, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ var strSliceRes [][]byte
|
|||
func benchmarkRingkeys(b *testing.B, r *ring, keys int) {
|
||||
// Add some keys
|
||||
for i := 0; i < keys; i++ {
|
||||
r.write([]byte(fmt.Sprintf("cpu,host=server-%d value=1", i)), Values([]Value{
|
||||
r.write(fmt.Sprintf("cpu,host=server-%d value=1", i), Values([]Value{
|
||||
IntegerValue{
|
||||
unixnano: 1,
|
||||
value: int64(i),
|
||||
|
@ -77,7 +77,7 @@ func benchmarkRingGetPartition(b *testing.B, r *ring, keys int) {
|
|||
// Add some keys
|
||||
for i := 0; i < keys; i++ {
|
||||
vals[i] = []byte(fmt.Sprintf("cpu,host=server-%d field1=value1,field2=value2,field4=value4,field5=value5,field6=value6,field7=value7,field8=value1,field9=value2,field10=value4,field11=value5,field12=value6,field13=value7", i))
|
||||
r.write([]byte(fmt.Sprintf("cpu,host=server-%d value=1", i)), Values([]Value{
|
||||
r.write(fmt.Sprintf("cpu,host=server-%d value=1", i), Values([]Value{
|
||||
IntegerValue{
|
||||
unixnano: 1,
|
||||
value: int64(i),
|
||||
|
@ -109,7 +109,7 @@ func benchmarkRingWrite(b *testing.B, r *ring, n int) {
|
|||
go func() {
|
||||
defer wg.Done()
|
||||
for j := 0; j < n; j++ {
|
||||
if _, err := r.write([]byte(fmt.Sprintf("cpu,host=server-%d value=1", j)), Values{}); err != nil {
|
||||
if _, err := r.write(fmt.Sprintf("cpu,host=server-%d value=1", j), Values{}); err != nil {
|
||||
errC <- err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,7 +13,7 @@ import (
|
|||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/cespare/xxhash"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/pkg/estimator"
|
||||
"github.com/influxdata/influxdb/pkg/estimator/hll"
|
||||
|
@ -141,7 +141,7 @@ type Index struct {
|
|||
path string // Root directory of the index partitions.
|
||||
disableCompactions bool // Initially disables compactions on the index.
|
||||
maxLogFileSize int64 // Maximum size of a LogFile before it's compacted.
|
||||
maxLogFileAge time.Duration // Maximum age of a LogFile before it's compacted
|
||||
maxLogFileAge time.Duration // Maximum age of a LogFile before it's compacted.
|
||||
logfileBufferSize int // The size of the buffer used by the LogFile.
|
||||
disableFsync bool // Disables flushing buffers and fsyning files. Used when working with indexes offline.
|
||||
logger *zap.Logger // Index's logger.
|
||||
|
@ -398,9 +398,9 @@ func (i *Index) updateMeasurementSketches() error {
|
|||
for j := 0; j < int(i.PartitionN); j++ {
|
||||
if s, t, err := i.partitions[j].MeasurementsSketches(); err != nil {
|
||||
return err
|
||||
} else if i.mSketch.Merge(s); err != nil {
|
||||
} else if err := i.mSketch.Merge(s); err != nil {
|
||||
return err
|
||||
} else if i.mTSketch.Merge(t); err != nil {
|
||||
} else if err := i.mTSketch.Merge(t); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -412,9 +412,9 @@ func (i *Index) updateSeriesSketches() error {
|
|||
for j := 0; j < int(i.PartitionN); j++ {
|
||||
if s, t, err := i.partitions[j].SeriesSketches(); err != nil {
|
||||
return err
|
||||
} else if i.sSketch.Merge(s); err != nil {
|
||||
} else if err := i.sSketch.Merge(s); err != nil {
|
||||
return err
|
||||
} else if i.sTSketch.Merge(t); err != nil {
|
||||
} else if err := i.sTSketch.Merge(t); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -898,7 +898,7 @@ func (i *Index) DropSeriesList(seriesIDs []uint64, keys [][]byte, _ bool) error
|
|||
func (i *Index) DropSeriesGlobal(key []byte) error { return nil }
|
||||
|
||||
// DropMeasurementIfSeriesNotExist drops a measurement only if there are no more
|
||||
// series for the measurment.
|
||||
// series for the measurement.
|
||||
func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) (bool, error) {
|
||||
// Check if that was the last series for the measurement in the entire index.
|
||||
if ok, err := i.MeasurementHasSeries(name); err != nil {
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/cespare/xxhash"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/pkg/binaryutil"
|
||||
"github.com/influxdata/influxdb/pkg/limiter"
|
||||
|
@ -401,7 +401,7 @@ func ParseSeriesKey(data []byte) (name []byte, tags models.Tags) {
|
|||
}
|
||||
|
||||
// ParseSeriesKeyInto extracts the name and tags for data, parsing the tags into
|
||||
// dstTags, which is then returened.
|
||||
// dstTags, which is then returned.
|
||||
//
|
||||
// The returned dstTags may have a different length and capacity.
|
||||
func ParseSeriesKeyInto(data []byte, dstTags models.Tags) ([]byte, models.Tags) {
|
||||
|
|
Loading…
Reference in New Issue