Improve performance of TSI bloom filter
This commit replaces the previous hashing algorithm used by the pkg.Filter with one based on xxhash. Further, taking from the hashing literature, we can represent k hashes with only two hash function, where previously Filter was using four. Further, unlike `murmur3`, `xxhash` is allocation-free, so allocations have dramatically reduced when inserting and checking for hashes.pull/8857/head
parent
fe960b0f3a
commit
5b7fc517fa
|
@ -30,6 +30,7 @@
|
|||
- [#8854](https://github.com/influxdata/influxdb/pull/8854): Report the task status for a query.
|
||||
- [#8853](https://github.com/influxdata/influxdb/pull/8853): Reduce allocations, improve `readEntries` performance by simplifying loop
|
||||
- [#8830](https://github.com/influxdata/influxdb/issues/8830): Separate importer log statements to stdout and stderr.
|
||||
- [#8857](https://github.com/influxdata/influxdb/pull/8857): Improve performance of Bloom Filter in TSI index.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
|
|
@ -2,8 +2,9 @@ package bloom
|
|||
|
||||
// NOTE:
|
||||
// This package implements a limited bloom filter implementation based on
|
||||
// Will Fitzgerald's bloom & bitset packages. It's implemented locally to
|
||||
// support zero-copy memory-mapped slices.
|
||||
// Will Fitzgerald's bloom & bitset packages. It uses a zero-allocation xxhash
|
||||
// implementation, rather than murmur3. It's implemented locally to support
|
||||
// zero-copy memory-mapped slices.
|
||||
//
|
||||
// This also optimizes the filter by always using a bitset size with a power of 2.
|
||||
|
||||
|
@ -11,31 +12,21 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/influxdata/influxdb/pkg/pool"
|
||||
"github.com/spaolacci/murmur3"
|
||||
"github.com/cespare/xxhash"
|
||||
)
|
||||
|
||||
// Filter represents a bloom filter.
|
||||
type Filter struct {
|
||||
k uint64
|
||||
b []byte
|
||||
mask uint64
|
||||
hashPool *pool.Generic
|
||||
k uint64
|
||||
b []byte
|
||||
mask uint64
|
||||
}
|
||||
|
||||
// NewFilter returns a new instance of Filter using m bits and k hash functions.
|
||||
// If m is not a power of two then it is rounded to the next highest power of 2.
|
||||
func NewFilter(m uint64, k uint64) *Filter {
|
||||
m = pow2(m)
|
||||
|
||||
return &Filter{
|
||||
k: k,
|
||||
b: make([]byte, m/8),
|
||||
mask: m - 1,
|
||||
hashPool: pool.NewGeneric(16, func(sz int) interface{} {
|
||||
return murmur3.New128()
|
||||
}),
|
||||
}
|
||||
return &Filter{k: k, b: make([]byte, m/8), mask: m - 1}
|
||||
}
|
||||
|
||||
// NewFilterBuffer returns a new instance of a filter using a backing buffer.
|
||||
|
@ -45,15 +36,7 @@ func NewFilterBuffer(buf []byte, k uint64) (*Filter, error) {
|
|||
if m != uint64(len(buf))*8 {
|
||||
return nil, fmt.Errorf("bloom.Filter: buffer bit count must a power of two: %d/%d", len(buf)*8, m)
|
||||
}
|
||||
|
||||
return &Filter{
|
||||
k: k,
|
||||
b: buf,
|
||||
mask: m - 1,
|
||||
hashPool: pool.NewGeneric(16, func(sz int) interface{} {
|
||||
return murmur3.New128()
|
||||
}),
|
||||
}, nil
|
||||
return &Filter{k: k, b: buf, mask: m - 1}, nil
|
||||
}
|
||||
|
||||
// Len returns the number of bits used in the filter.
|
||||
|
@ -67,7 +50,7 @@ func (f *Filter) Bytes() []byte { return f.b }
|
|||
|
||||
// Clone returns a copy of f.
|
||||
func (f *Filter) Clone() *Filter {
|
||||
other := &Filter{k: f.k, b: make([]byte, len(f.b)), mask: f.mask, hashPool: f.hashPool}
|
||||
other := &Filter{k: f.k, b: make([]byte, len(f.b)), mask: f.mask}
|
||||
copy(other.b, f.b)
|
||||
return other
|
||||
}
|
||||
|
@ -116,21 +99,22 @@ func (f *Filter) Merge(other *Filter) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// location returns the ith hashed location using the four base hash values.
|
||||
func (f *Filter) location(h [4]uint64, i uint64) uint {
|
||||
return uint((h[i%2] + i*h[2+(((i+(i%2))%4)/2)]) & f.mask)
|
||||
// location returns the ith hashed location using two hash values.
|
||||
func (f *Filter) location(h [2]uint64, i uint64) uint {
|
||||
return uint((h[0] + h[1]*i) & f.mask)
|
||||
}
|
||||
|
||||
// hash returns a set of 4 based hashes.
|
||||
func (f *Filter) hash(data []byte) [4]uint64 {
|
||||
h := f.hashPool.Get(0).(murmur3.Hash128)
|
||||
defer f.hashPool.Put(h)
|
||||
h.Reset()
|
||||
h.Write(data)
|
||||
v1, v2 := h.Sum128()
|
||||
h.Write([]byte{1})
|
||||
v3, v4 := h.Sum128()
|
||||
return [4]uint64{v1, v2, v3, v4}
|
||||
// hash returns two 64-bit hashes based on the output of xxhash.
|
||||
func (f *Filter) hash(data []byte) [2]uint64 {
|
||||
v1 := xxhash.Sum64(data)
|
||||
var v2 uint64
|
||||
if len(data) > 0 {
|
||||
b := data[len(data)-1] // We'll put the original byte back.
|
||||
data[len(data)-1] = byte(0)
|
||||
v2 = xxhash.Sum64(data)
|
||||
data[len(data)-1] = b
|
||||
}
|
||||
return [2]uint64{v1, v2}
|
||||
}
|
||||
|
||||
// Estimate returns an estimated bit count and hash count given the element count and false positive rate.
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package bloom_test
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
|
@ -9,46 +10,97 @@ import (
|
|||
|
||||
// Ensure filter can insert values and verify they exist.
|
||||
func TestFilter_InsertContains(t *testing.T) {
|
||||
f := bloom.NewFilter(1000, 4)
|
||||
// Short, less comprehensive test.
|
||||
testShortFilter_InsertContains(t)
|
||||
|
||||
// Insert value and validate.
|
||||
f.Insert([]byte("Bess"))
|
||||
if !f.Contains([]byte("Bess")) {
|
||||
t.Fatal("expected true")
|
||||
if testing.Short() {
|
||||
return // Just run the above short test
|
||||
}
|
||||
|
||||
// Insert another value and test.
|
||||
f.Insert([]byte("Emma"))
|
||||
if !f.Contains([]byte("Emma")) {
|
||||
t.Fatal("expected true")
|
||||
// More comprehensive test for the xxhash based Bloom Filter.
|
||||
|
||||
// These parameters will result, for 10M entries, with a bloom filter
|
||||
// with 0.001 false positive rate (1 in 1000 values will be incorrectly
|
||||
// identified as being present in the set).
|
||||
filter := bloom.NewFilter(143775876, 10)
|
||||
v := make([]byte, 4, 4)
|
||||
for i := 0; i < 10000000; i++ {
|
||||
binary.BigEndian.PutUint32(v, uint32(i))
|
||||
filter.Insert(v)
|
||||
}
|
||||
|
||||
// Validate that a non-existent value doesn't exist.
|
||||
if f.Contains([]byte("Jane")) {
|
||||
t.Fatal("expected false")
|
||||
}
|
||||
// None of the values inserted should ever be considered "not possibly in
|
||||
// the filter".
|
||||
t.Run("100M", func(t *testing.T) {
|
||||
for i := 0; i < 10000000; i++ {
|
||||
binary.BigEndian.PutUint32(v, uint32(i))
|
||||
if !filter.Contains(v) {
|
||||
t.Fatalf("got false for value %q, expected true", v)
|
||||
}
|
||||
}
|
||||
|
||||
// If we check for 100,000,000 values that we know are not present in the
|
||||
// filter then we might expect around 100,000 of them to be false positives.
|
||||
var fp int
|
||||
for i := 10000000; i < 110000000; i++ {
|
||||
binary.BigEndian.PutUint32(v, uint32(i))
|
||||
if filter.Contains(v) {
|
||||
fp++
|
||||
}
|
||||
}
|
||||
|
||||
if fp > 1000000 {
|
||||
// If we're an order of magnitude off, then it's arguable that there
|
||||
// is a bug in the bloom filter.
|
||||
t.Fatalf("got %d false positives which is an error rate of %f, expected error rate <=0.001", fp, float64(fp)/100000000)
|
||||
}
|
||||
t.Logf("Bloom false positive error rate was %f", float64(fp)/100000000)
|
||||
})
|
||||
}
|
||||
|
||||
func testShortFilter_InsertContains(t *testing.T) {
|
||||
t.Run("short", func(t *testing.T) {
|
||||
f := bloom.NewFilter(1000, 4)
|
||||
|
||||
// Insert value and validate.
|
||||
f.Insert([]byte("Bess"))
|
||||
if !f.Contains([]byte("Bess")) {
|
||||
t.Fatal("expected true")
|
||||
}
|
||||
|
||||
// Insert another value and test.
|
||||
f.Insert([]byte("Emma"))
|
||||
if !f.Contains([]byte("Emma")) {
|
||||
t.Fatal("expected true")
|
||||
}
|
||||
|
||||
// Validate that a non-existent value doesn't exist.
|
||||
if f.Contains([]byte("Jane")) {
|
||||
t.Fatal("expected false")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
var benchCases = []struct {
|
||||
m, k uint64
|
||||
n int
|
||||
}{
|
||||
{m: 100, k: 4, n: 1000},
|
||||
{m: 1000, k: 4, n: 1000},
|
||||
{m: 10000, k: 4, n: 1000},
|
||||
{m: 100000, k: 4, n: 1000},
|
||||
{m: 100, k: 8, n: 1000},
|
||||
{m: 1000, k: 8, n: 1000},
|
||||
{m: 10000, k: 8, n: 1000},
|
||||
{m: 100000, k: 8, n: 1000},
|
||||
{m: 100, k: 20, n: 1000},
|
||||
{m: 1000, k: 20, n: 1000},
|
||||
{m: 10000, k: 20, n: 1000},
|
||||
{m: 100000, k: 20, n: 1000},
|
||||
}
|
||||
|
||||
func BenchmarkFilter_Insert(b *testing.B) {
|
||||
cases := []struct {
|
||||
m, k uint64
|
||||
n int
|
||||
}{
|
||||
{m: 100, k: 4, n: 1000},
|
||||
{m: 1000, k: 4, n: 1000},
|
||||
{m: 10000, k: 4, n: 1000},
|
||||
{m: 100000, k: 4, n: 1000},
|
||||
{m: 100, k: 8, n: 1000},
|
||||
{m: 1000, k: 8, n: 1000},
|
||||
{m: 10000, k: 8, n: 1000},
|
||||
{m: 100000, k: 8, n: 1000},
|
||||
{m: 100, k: 20, n: 1000},
|
||||
{m: 1000, k: 20, n: 1000},
|
||||
{m: 10000, k: 20, n: 1000},
|
||||
{m: 100000, k: 20, n: 1000},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
for _, c := range benchCases {
|
||||
data := make([][]byte, 0, c.n)
|
||||
for i := 0; i < c.n; i++ {
|
||||
data = append(data, []byte(fmt.Sprintf("%d", i)))
|
||||
|
@ -63,31 +115,14 @@ func BenchmarkFilter_Insert(b *testing.B) {
|
|||
}
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
var okResult bool
|
||||
|
||||
func BenchmarkFilter_Contains(b *testing.B) {
|
||||
cases := []struct {
|
||||
m, k uint64
|
||||
n int
|
||||
}{
|
||||
{m: 100, k: 4, n: 1000},
|
||||
{m: 1000, k: 4, n: 1000},
|
||||
{m: 10000, k: 4, n: 1000},
|
||||
{m: 100000, k: 4, n: 1000},
|
||||
{m: 100, k: 8, n: 1000},
|
||||
{m: 1000, k: 8, n: 1000},
|
||||
{m: 10000, k: 8, n: 1000},
|
||||
{m: 100000, k: 8, n: 1000},
|
||||
{m: 100, k: 20, n: 1000},
|
||||
{m: 1000, k: 20, n: 1000},
|
||||
{m: 10000, k: 20, n: 1000},
|
||||
{m: 100000, k: 20, n: 1000},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
for _, c := range benchCases {
|
||||
data := make([][]byte, 0, c.n)
|
||||
notData := make([][]byte, 0, c.n)
|
||||
for i := 0; i < c.n; i++ {
|
||||
|
@ -120,25 +155,7 @@ func BenchmarkFilter_Contains(b *testing.B) {
|
|||
}
|
||||
|
||||
func BenchmarkFilter_Merge(b *testing.B) {
|
||||
cases := []struct {
|
||||
m, k uint64
|
||||
n int
|
||||
}{
|
||||
{m: 100, k: 4, n: 1000},
|
||||
{m: 1000, k: 4, n: 1000},
|
||||
{m: 10000, k: 4, n: 1000},
|
||||
{m: 100000, k: 4, n: 1000},
|
||||
{m: 100, k: 8, n: 1000},
|
||||
{m: 1000, k: 8, n: 1000},
|
||||
{m: 10000, k: 8, n: 1000},
|
||||
{m: 100000, k: 8, n: 1000},
|
||||
{m: 100, k: 20, n: 1000},
|
||||
{m: 1000, k: 20, n: 1000},
|
||||
{m: 10000, k: 20, n: 1000},
|
||||
{m: 100000, k: 20, n: 1000},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
for _, c := range benchCases {
|
||||
data1 := make([][]byte, 0, c.n)
|
||||
data2 := make([][]byte, 0, c.n)
|
||||
for i := 0; i < c.n; i++ {
|
||||
|
|
Loading…
Reference in New Issue