Initial import pkg package

pull/10616/head
Edd Robinson 2018-09-26 14:59:04 +01:00
parent f0ba72f227
commit fb0db04bc1
34 changed files with 3281 additions and 12 deletions

3
go.mod
View File

@ -21,6 +21,7 @@ require (
github.com/campoy/unique v0.0.0-20180121183637-88950e537e7e // indirect
github.com/cespare/xxhash v1.1.0
github.com/coreos/bbolt v1.3.0
github.com/davecgh/go-spew v1.1.1
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8 // indirect
github.com/elazarl/go-bindata-assetfs v1.0.0
@ -104,7 +105,7 @@ require (
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
golang.org/x/sys v0.0.0-20180920110915-d641721ec2de // indirect
golang.org/x/text v0.3.0
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 // indirect
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e // indirect
google.golang.org/api v0.0.0-20180723152133-cd7aead8ef37
google.golang.org/appengine v1.0.0 // indirect

View File

@ -15,7 +15,7 @@ import (
"unicode"
"unicode/utf8"
"github.com/influxdata/influxdb/pkg/escape"
"github.com/influxdata/platform/pkg/escape"
)
type escapeSet struct {

View File

@ -0,0 +1,22 @@
package binaryutil
// VarintSize returns the number of bytes to varint encode x.
// This code is copied from encoding/binary.PutVarint() with the buffer removed.
func VarintSize(x int64) int {
ux := uint64(x) << 1
if x < 0 {
ux = ^ux
}
return UvarintSize(ux)
}
// UvarintSize returns the number of bytes to uvarint encode x.
// This code is copied from encoding/binary.PutUvarint() with the buffer removed.
func UvarintSize(x uint64) int {
i := 0
for x >= 0x80 {
x >>= 7
i++
}
return i + 1
}

195
pkg/bytesutil/bytesutil.go Normal file
View File

@ -0,0 +1,195 @@
package bytesutil
import (
"bytes"
"fmt"
"sort"
)
// Sort sorts a slice of byte slices.
func Sort(a [][]byte) {
sort.Sort(byteSlices(a))
}
// SortDedup sorts the byte slice a and removes duplicates. The ret
func SortDedup(a [][]byte) [][]byte {
if len(a) < 2 {
return a
}
Sort(a)
i, j := 0, 1
for j < len(a) {
if !bytes.Equal(a[j-1], a[j]) {
a[i] = a[j-1]
i++
}
j++
}
a[i] = a[j-1]
i++
return a[:i]
}
func IsSorted(a [][]byte) bool {
return sort.IsSorted(byteSlices(a))
}
// SearchBytes performs a binary search for x in the sorted slice a.
func SearchBytes(a [][]byte, x []byte) int {
// Define f(i) => bytes.Compare(a[i], x) < 0
// Define f(-1) == false and f(n) == true.
// Invariant: f(i-1) == false, f(j) == true.
i, j := 0, len(a)
for i < j {
h := int(uint(i+j) >> 1) // avoid overflow when computing h
// i ≤ h < j
if bytes.Compare(a[h], x) < 0 {
i = h + 1 // preserves f(i-1) == false
} else {
j = h // preserves f(j) == true
}
}
// i == j, f(i-1) == false, and f(j) (= f(i)) == true => answer is i.
return i
}
// Contains returns true if x is an element of the sorted slice a.
func Contains(a [][]byte, x []byte) bool {
n := SearchBytes(a, x)
return n < len(a) && bytes.Equal(a[n], x)
}
// SearchBytesFixed searches a for x using a binary search. The size of a must be a multiple of
// of x or else the function panics. There returned value is the index within a where x should
// exist. The caller should ensure that x does exist at this index.
func SearchBytesFixed(a []byte, sz int, fn func(x []byte) bool) int {
if len(a)%sz != 0 {
panic(fmt.Sprintf("x is not a multiple of a: %d %d", len(a), sz))
}
i, j := 0, len(a)-sz
for i < j {
h := int(uint(i+j) >> 1)
h -= h % sz
if !fn(a[h : h+sz]) {
i = h + sz
} else {
j = h
}
}
return i
}
// Union returns the union of a & b in sorted order.
func Union(a, b [][]byte) [][]byte {
n := len(b)
if len(a) > len(b) {
n = len(a)
}
other := make([][]byte, 0, n)
for {
if len(a) > 0 && len(b) > 0 {
if cmp := bytes.Compare(a[0], b[0]); cmp == 0 {
other, a, b = append(other, a[0]), a[1:], b[1:]
} else if cmp == -1 {
other, a = append(other, a[0]), a[1:]
} else {
other, b = append(other, b[0]), b[1:]
}
} else if len(a) > 0 {
other, a = append(other, a[0]), a[1:]
} else if len(b) > 0 {
other, b = append(other, b[0]), b[1:]
} else {
return other
}
}
}
// Intersect returns the intersection of a & b in sorted order.
func Intersect(a, b [][]byte) [][]byte {
n := len(b)
if len(a) > len(b) {
n = len(a)
}
other := make([][]byte, 0, n)
for len(a) > 0 && len(b) > 0 {
if cmp := bytes.Compare(a[0], b[0]); cmp == 0 {
other, a, b = append(other, a[0]), a[1:], b[1:]
} else if cmp == -1 {
a = a[1:]
} else {
b = b[1:]
}
}
return other
}
// Clone returns a copy of b.
func Clone(b []byte) []byte {
if b == nil {
return nil
}
buf := make([]byte, len(b))
copy(buf, b)
return buf
}
// CloneSlice returns a copy of a slice of byte slices.
func CloneSlice(a [][]byte) [][]byte {
other := make([][]byte, len(a))
for i := range a {
other[i] = Clone(a[i])
}
return other
}
// Pack converts a sparse array to a dense one. It removes sections of a containing
// runs of val of length width. The returned value is a subslice of a.
func Pack(a []byte, width int, val byte) []byte {
var i, j, jStart, end int
fill := make([]byte, width)
for i := 0; i < len(fill); i++ {
fill[i] = val
}
// Skip the first run that won't move
for ; i < len(a) && a[i] != val; i += width {
}
end = i
for i < len(a) {
// Find the next gap to remove
for i < len(a) && a[i] == val {
i += width
}
// Find the next non-gap to keep
jStart = i
for j = i; j < len(a) && a[j] != val; j += width {
}
if jStart == len(a) {
break
}
// Move the non-gap over the section to remove.
copy(a[end:], a[jStart:j])
end += j - jStart
i = j
}
return a[:end]
}
type byteSlices [][]byte
func (a byteSlices) Len() int { return len(a) }
func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

View File

@ -0,0 +1,281 @@
package bytesutil_test
import (
"bytes"
"encoding/binary"
"strings"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform/pkg/bytesutil"
)
func TestSearchBytesFixed(t *testing.T) {
n, sz := 5, 8
a := make([]byte, n*sz) // 5 - 8 byte int64s
for i := 0; i < 5; i++ {
binary.BigEndian.PutUint64(a[i*sz:i*sz+sz], uint64(i))
}
var x [8]byte
for i := 0; i < n; i++ {
binary.BigEndian.PutUint64(x[:], uint64(i))
if exp, got := i*sz, bytesutil.SearchBytesFixed(a, len(x), func(v []byte) bool {
return bytes.Compare(v, x[:]) >= 0
}); exp != got {
t.Fatalf("index mismatch: exp %v, got %v", exp, got)
}
}
if exp, got := len(a)-1, bytesutil.SearchBytesFixed(a, 1, func(v []byte) bool {
return bytes.Compare(v, []byte{99}) >= 0
}); exp != got {
t.Fatalf("index mismatch: exp %v, got %v", exp, got)
}
}
func TestSearchBytes(t *testing.T) {
in := toByteSlices("bbb", "ccc", "eee", "fff", "ggg", "hhh")
tests := []struct {
name string
x string
exp int
}{
{"exists first", "bbb", 0},
{"exists middle", "eee", 2},
{"exists last", "hhh", 5},
{"not exists last", "zzz", 6},
{"not exists first", "aaa", 0},
{"not exists mid", "ddd", 2},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := bytesutil.SearchBytes(in, []byte(test.x))
if got != test.exp {
t.Errorf("got %d, expected %d", got, test.exp)
}
})
}
}
func TestContains(t *testing.T) {
in := toByteSlices("bbb", "ccc", "eee", "fff", "ggg", "hhh")
tests := []struct {
name string
x string
exp bool
}{
{"exists first", "bbb", true},
{"exists middle", "eee", true},
{"exists last", "hhh", true},
{"not exists last", "zzz", false},
{"not exists first", "aaa", false},
{"not exists mid", "ddd", false},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := bytesutil.Contains(in, []byte(test.x))
if got != test.exp {
t.Errorf("got %t, expected %t", got, test.exp)
}
})
}
}
func toByteSlices(s ...string) [][]byte {
r := make([][]byte, len(s))
for i, v := range s {
r[i] = []byte(v)
}
return r
}
func TestSortDedup(t *testing.T) {
tests := []struct {
name string
in [][]byte
exp [][]byte
}{
{
name: "mixed dupes",
in: toByteSlices("bbb", "aba", "bbb", "aba", "ccc", "bbb", "aba"),
exp: toByteSlices("aba", "bbb", "ccc"),
},
{
name: "no dupes",
in: toByteSlices("bbb", "ccc", "ddd"),
exp: toByteSlices("bbb", "ccc", "ddd"),
},
{
name: "dupe at end",
in: toByteSlices("ccc", "ccc", "aaa"),
exp: toByteSlices("aaa", "ccc"),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
out := bytesutil.SortDedup(test.in)
if !cmp.Equal(out, test.exp) {
t.Error("invalid result")
}
})
}
}
func TestPack_WidthOne_One(t *testing.T) {
a := make([]byte, 8)
a[4] = 1
a = bytesutil.Pack(a, 1, 0)
if got, exp := len(a), 1; got != exp {
t.Fatalf("len mismatch: got %v, exp %v", got, exp)
}
for i, v := range []byte{1} {
if got, exp := a[i], v; got != exp {
t.Fatalf("value mismatch: a[%d] = %v, exp %v", i, got, exp)
}
}
}
func TestPack_WidthOne_Two(t *testing.T) {
a := make([]byte, 8)
a[4] = 1
a[6] = 2
a = bytesutil.Pack(a, 1, 0)
if got, exp := len(a), 2; got != exp {
t.Fatalf("len mismatch: got %v, exp %v", got, exp)
}
for i, v := range []byte{1, 2} {
if got, exp := a[i], v; got != exp {
t.Fatalf("value mismatch: a[%d] = %v, exp %v", i, got, exp)
}
}
}
func TestPack_WidthTwo_Two(t *testing.T) {
a := make([]byte, 8)
a[2] = 1
a[3] = 1
a[6] = 2
a[7] = 2
a = bytesutil.Pack(a, 2, 0)
if got, exp := len(a), 4; got != exp {
t.Fatalf("len mismatch: got %v, exp %v", got, exp)
}
for i, v := range []byte{1, 1, 2, 2} {
if got, exp := a[i], v; got != exp {
t.Fatalf("value mismatch: a[%d] = %v, exp %v", i, got, exp)
}
}
}
func TestPack_WidthOne_Last(t *testing.T) {
a := make([]byte, 8)
a[6] = 2
a[7] = 2
a = bytesutil.Pack(a, 2, 255)
if got, exp := len(a), 8; got != exp {
t.Fatalf("len mismatch: got %v, exp %v", got, exp)
}
for i, v := range []byte{0, 0, 0, 0, 0, 0, 2, 2} {
if got, exp := a[i], v; got != exp {
t.Fatalf("value mismatch: a[%d] = %v, exp %v", i, got, exp)
}
}
}
func TestPack_WidthOne_LastFill(t *testing.T) {
a := make([]byte, 8)
a[0] = 255
a[1] = 255
a[2] = 2
a[3] = 2
a[4] = 2
a[5] = 2
a[6] = 2
a[7] = 2
a = bytesutil.Pack(a, 2, 255)
if got, exp := len(a), 6; got != exp {
t.Fatalf("len mismatch: got %v, exp %v", got, exp)
}
for i, v := range []byte{2, 2, 2, 2, 2, 2} {
if got, exp := a[i], v; got != exp {
t.Fatalf("value mismatch: a[%d] = %v, exp %v", i, got, exp)
}
}
}
var result [][]byte
func BenchmarkSortDedup(b *testing.B) {
b.Run("sort-deduplicate", func(b *testing.B) {
data := toByteSlices("bbb", "aba", "bbb", "aba", "ccc", "bbb", "aba")
in := append([][]byte{}, data...)
b.ReportAllocs()
copy(in, data)
for i := 0; i < b.N; i++ {
result = bytesutil.SortDedup(in)
b.StopTimer()
copy(in, data)
b.StartTimer()
}
})
}
func BenchmarkContains_True(b *testing.B) {
var in [][]byte
for i := 'a'; i <= 'z'; i++ {
in = append(in, []byte(strings.Repeat(string(i), 3)))
}
for i := 0; i < b.N; i++ {
bytesutil.Contains(in, []byte("xxx"))
}
}
func BenchmarkContains_False(b *testing.B) {
var in [][]byte
for i := 'a'; i <= 'z'; i++ {
in = append(in, []byte(strings.Repeat(string(i), 3)))
}
for i := 0; i < b.N; i++ {
bytesutil.Contains(in, []byte("a"))
}
}
func BenchmarkSearchBytes_Exists(b *testing.B) {
var in [][]byte
for i := 'a'; i <= 'z'; i++ {
in = append(in, []byte(strings.Repeat(string(i), 3)))
}
for i := 0; i < b.N; i++ {
bytesutil.SearchBytes(in, []byte("xxx"))
}
}
func BenchmarkSearchBytes_NotExits(b *testing.B) {
var in [][]byte
for i := 'a'; i <= 'z'; i++ {
in = append(in, []byte(strings.Repeat(string(i), 3)))
}
for i := 0; i < b.N; i++ {
bytesutil.SearchBytes(in, []byte("a"))
}
}

115
pkg/escape/bytes.go Normal file
View File

@ -0,0 +1,115 @@
// Package escape contains utilities for escaping parts of InfluxQL
// and InfluxDB line protocol.
package escape // import "github.com/influxdata/platform/pkg/escape"
import (
"bytes"
"strings"
)
// Codes is a map of bytes to be escaped.
var Codes = map[byte][]byte{
',': []byte(`\,`),
'"': []byte(`\"`),
' ': []byte(`\ `),
'=': []byte(`\=`),
}
// Bytes escapes characters on the input slice, as defined by Codes.
func Bytes(in []byte) []byte {
for b, esc := range Codes {
in = bytes.Replace(in, []byte{b}, esc, -1)
}
return in
}
const escapeChars = `," =`
// IsEscaped returns whether b has any escaped characters,
// i.e. whether b seems to have been processed by Bytes.
func IsEscaped(b []byte) bool {
for len(b) > 0 {
i := bytes.IndexByte(b, '\\')
if i < 0 {
return false
}
if i+1 < len(b) && strings.IndexByte(escapeChars, b[i+1]) >= 0 {
return true
}
b = b[i+1:]
}
return false
}
// AppendUnescaped appends the unescaped version of src to dst
// and returns the resulting slice.
func AppendUnescaped(dst, src []byte) []byte {
var pos int
for len(src) > 0 {
next := bytes.IndexByte(src[pos:], '\\')
if next < 0 || pos+next+1 >= len(src) {
return append(dst, src...)
}
if pos+next+1 < len(src) && strings.IndexByte(escapeChars, src[pos+next+1]) >= 0 {
if pos+next > 0 {
dst = append(dst, src[:pos+next]...)
}
src = src[pos+next+1:]
pos = 0
} else {
pos += next + 1
}
}
return dst
}
// Unescape returns a new slice containing the unescaped version of in.
func Unescape(in []byte) []byte {
if len(in) == 0 {
return nil
}
if bytes.IndexByte(in, '\\') == -1 {
return in
}
i := 0
inLen := len(in)
// The output size will be no more than inLen. Preallocating the
// capacity of the output is faster and uses less memory than
// letting append() do its own (over)allocation.
out := make([]byte, 0, inLen)
for {
if i >= inLen {
break
}
if in[i] == '\\' && i+1 < inLen {
switch in[i+1] {
case ',':
out = append(out, ',')
i += 2
continue
case '"':
out = append(out, '"')
i += 2
continue
case ' ':
out = append(out, ' ')
i += 2
continue
case '=':
out = append(out, '=')
i += 2
continue
}
}
out = append(out, in[i])
i += 1
}
return out
}

139
pkg/escape/bytes_test.go Normal file
View File

@ -0,0 +1,139 @@
package escape
import (
"bytes"
"reflect"
"strings"
"testing"
)
var result []byte
func BenchmarkBytesEscapeNoEscapes(b *testing.B) {
buf := []byte(`no_escapes`)
for i := 0; i < b.N; i++ {
result = Bytes(buf)
}
}
func BenchmarkUnescapeNoEscapes(b *testing.B) {
buf := []byte(`no_escapes`)
for i := 0; i < b.N; i++ {
result = Unescape(buf)
}
}
func BenchmarkBytesEscapeMany(b *testing.B) {
tests := [][]byte{
[]byte("this is my special string"),
[]byte("a field w=i th == tons of escapes"),
[]byte("some,commas,here"),
}
for n := 0; n < b.N; n++ {
for _, test := range tests {
result = Bytes(test)
}
}
}
func BenchmarkUnescapeMany(b *testing.B) {
tests := [][]byte{
[]byte(`this\ is\ my\ special\ string`),
[]byte(`a\ field\ w\=i\ th\ \=\=\ tons\ of\ escapes`),
[]byte(`some\,commas\,here`),
}
for i := 0; i < b.N; i++ {
for _, test := range tests {
result = Unescape(test)
}
}
}
var boolResult bool
func BenchmarkIsEscaped(b *testing.B) {
tests := [][]byte{
[]byte(`no_escapes`),
[]byte(`a\ field\ w\=i\ th\ \=\=\ tons\ of\ escapes`),
[]byte(`some\,commas\,here`),
}
for i := 0; i < b.N; i++ {
for _, test := range tests {
boolResult = IsEscaped(test)
}
}
}
func BenchmarkAppendUnescaped(b *testing.B) {
tests := [][]byte{
[]byte(`this\ is\ my\ special\ string`),
[]byte(`a\ field\ w\=i\ th\ \=\=\ tons\ of\ escapes`),
[]byte(`some\,commas\,here`),
}
for i := 0; i < b.N; i++ {
result = nil
for _, test := range tests {
result = AppendUnescaped(result, test)
}
}
}
func TestUnescape(t *testing.T) {
tests := []struct {
in []byte
out []byte
}{
{
[]byte(nil),
[]byte(nil),
},
{
[]byte(""),
[]byte(nil),
},
{
[]byte("\\,\\\"\\ \\="),
[]byte(",\" ="),
},
{
[]byte("\\\\"),
[]byte("\\\\"),
},
{
[]byte("plain and simple"),
[]byte("plain and simple"),
},
}
for ii, tt := range tests {
got := Unescape(tt.in)
if !reflect.DeepEqual(got, tt.out) {
t.Errorf("[%d] Unescape(%#v) = %#v, expected %#v", ii, string(tt.in), string(got), string(tt.out))
}
}
}
func TestAppendUnescaped(t *testing.T) {
cases := strings.Split(strings.TrimSpace(`
normal
inv\alid
goo\"d
sp\ ace
\,\"\ \=
f\\\ x
`), "\n")
for _, c := range cases {
exp := Unescape([]byte(c))
got := AppendUnescaped(nil, []byte(c))
if !bytes.Equal(got, exp) {
t.Errorf("AppendUnescaped failed for %#q: got %#q, exp %#q", c, got, exp)
}
}
}

21
pkg/escape/strings.go Normal file
View File

@ -0,0 +1,21 @@
package escape
import "strings"
var (
escaper = strings.NewReplacer(`,`, `\,`, `"`, `\"`, ` `, `\ `, `=`, `\=`)
unescaper = strings.NewReplacer(`\,`, `,`, `\"`, `"`, `\ `, ` `, `\=`, `=`)
)
// UnescapeString returns unescaped version of in.
func UnescapeString(in string) string {
if strings.IndexByte(in, '\\') == -1 {
return in
}
return unescaper.Replace(in)
}
// String returns the escaped version of in.
func String(in string) string {
return escaper.Replace(in)
}

115
pkg/escape/strings_test.go Normal file
View File

@ -0,0 +1,115 @@
package escape
import (
"testing"
)
var s string
func BenchmarkStringEscapeNoEscapes(b *testing.B) {
for n := 0; n < b.N; n++ {
s = String("no_escapes")
}
}
func BenchmarkStringUnescapeNoEscapes(b *testing.B) {
for n := 0; n < b.N; n++ {
s = UnescapeString("no_escapes")
}
}
func BenchmarkManyStringEscape(b *testing.B) {
tests := []string{
"this is my special string",
"a field w=i th == tons of escapes",
"some,commas,here",
}
for n := 0; n < b.N; n++ {
for _, test := range tests {
s = String(test)
}
}
}
func BenchmarkManyStringUnescape(b *testing.B) {
tests := []string{
`this\ is\ my\ special\ string`,
`a\ field\ w\=i\ th\ \=\=\ tons\ of\ escapes`,
`some\,commas\,here`,
}
for n := 0; n < b.N; n++ {
for _, test := range tests {
s = UnescapeString(test)
}
}
}
func TestStringEscape(t *testing.T) {
tests := []struct {
in string
expected string
}{
{
in: "",
expected: "",
},
{
in: "this is my special string",
expected: `this\ is\ my\ special\ string`,
},
{
in: "a field w=i th == tons of escapes",
expected: `a\ field\ w\=i\ th\ \=\=\ tons\ of\ escapes`,
},
{
in: "no_escapes",
expected: "no_escapes",
},
{
in: "some,commas,here",
expected: `some\,commas\,here`,
},
}
for _, test := range tests {
if test.expected != String(test.in) {
t.Errorf("Got %s, expected %s", String(test.in), test.expected)
}
}
}
func TestStringUnescape(t *testing.T) {
tests := []struct {
in string
expected string
}{
{
in: "",
expected: "",
},
{
in: `this\ is\ my\ special\ string`,
expected: "this is my special string",
},
{
in: `a\ field\ w\=i\ th\ \=\=\ tons\ of\ escapes`,
expected: "a field w=i th == tons of escapes",
},
{
in: "no_escapes",
expected: "no_escapes",
},
{
in: `some\,commas\,here`,
expected: "some,commas,here",
},
}
for _, test := range tests {
if test.expected != UnescapeString(test.in) {
t.Errorf("Got %s, expected %s", UnescapeString(test.in), test.expected)
}
}
}

View File

@ -0,0 +1,173 @@
package hll
import "encoding/binary"
// Original author of this file is github.com/clarkduvall/hyperloglog
type iterable interface {
decode(i int, last uint32) (uint32, int)
Len() int
Iter() *iterator
}
type iterator struct {
i int
last uint32
v iterable
}
func (iter *iterator) Next() uint32 {
n, i := iter.v.decode(iter.i, iter.last)
iter.last = n
iter.i = i
return n
}
func (iter *iterator) Peek() uint32 {
n, _ := iter.v.decode(iter.i, iter.last)
return n
}
func (iter iterator) HasNext() bool {
return iter.i < iter.v.Len()
}
type compressedList struct {
count uint32
last uint32
b variableLengthList
}
func (v *compressedList) Clone() *compressedList {
if v == nil {
return nil
}
newV := &compressedList{
count: v.count,
last: v.last,
}
newV.b = make(variableLengthList, len(v.b))
copy(newV.b, v.b)
return newV
}
func (v *compressedList) MarshalBinary() (data []byte, err error) {
// Marshal the variableLengthList
bdata, err := v.b.MarshalBinary()
if err != nil {
return nil, err
}
// At least 4 bytes for the two fixed sized values plus the size of bdata.
data = make([]byte, 0, 4+4+len(bdata))
// Marshal the count and last values.
data = append(data, []byte{
// Number of items in the list.
byte(v.count >> 24),
byte(v.count >> 16),
byte(v.count >> 8),
byte(v.count),
// The last item in the list.
byte(v.last >> 24),
byte(v.last >> 16),
byte(v.last >> 8),
byte(v.last),
}...)
// Append the list
return append(data, bdata...), nil
}
func (v *compressedList) UnmarshalBinary(data []byte) error {
// Set the count.
v.count, data = binary.BigEndian.Uint32(data[:4]), data[4:]
// Set the last value.
v.last, data = binary.BigEndian.Uint32(data[:4]), data[4:]
// Set the list.
sz, data := binary.BigEndian.Uint32(data[:4]), data[4:]
v.b = make([]uint8, sz)
for i := uint32(0); i < sz; i++ {
v.b[i] = uint8(data[i])
}
return nil
}
func newCompressedList(size int) *compressedList {
v := &compressedList{}
v.b = make(variableLengthList, 0, size)
return v
}
func (v *compressedList) Len() int {
return len(v.b)
}
func (v *compressedList) decode(i int, last uint32) (uint32, int) {
n, i := v.b.decode(i, last)
return n + last, i
}
func (v *compressedList) Append(x uint32) {
v.count++
v.b = v.b.Append(x - v.last)
v.last = x
}
func (v *compressedList) Iter() *iterator {
return &iterator{0, 0, v}
}
type variableLengthList []uint8
func (v variableLengthList) MarshalBinary() (data []byte, err error) {
// 4 bytes for the size of the list, and a byte for each element in the
// list.
data = make([]byte, 0, 4+v.Len())
// Length of the list. We only need 32 bits because the size of the set
// couldn't exceed that on 32 bit architectures.
sz := v.Len()
data = append(data, []byte{
byte(sz >> 24),
byte(sz >> 16),
byte(sz >> 8),
byte(sz),
}...)
// Marshal each element in the list.
for i := 0; i < sz; i++ {
data = append(data, byte(v[i]))
}
return data, nil
}
func (v variableLengthList) Len() int {
return len(v)
}
func (v *variableLengthList) Iter() *iterator {
return &iterator{0, 0, v}
}
func (v variableLengthList) decode(i int, last uint32) (uint32, int) {
var x uint32
j := i
for ; v[j]&0x80 != 0; j++ {
x |= uint32(v[j]&0x7f) << (uint(j-i) * 7)
}
x |= uint32(v[j]) << (uint(j-i) * 7)
return x, j + 1
}
func (v variableLengthList) Append(x uint32) variableLengthList {
for x&0xffffff80 != 0 {
v = append(v, uint8((x&0x7f)|0x80))
x >>= 7
}
return append(v, uint8(x&0x7f))
}

495
pkg/estimator/hll/hll.go Normal file
View File

@ -0,0 +1,495 @@
// Package hll contains a HyperLogLog++ with a LogLog-Beta bias correction implementation that is adapted (mostly
// copied) from an implementation provided by Clark DuVall
// github.com/clarkduvall/hyperloglog.
//
// The differences are that the implementation in this package:
//
// * uses an AMD64 optimised xxhash algorithm instead of murmur;
// * uses some AMD64 optimisations for things like clz;
// * works with []byte rather than a Hash64 interface, to reduce allocations;
// * implements encoding.BinaryMarshaler and encoding.BinaryUnmarshaler
//
// Based on some rough benchmarking, this implementation of HyperLogLog++ is
// around twice as fast as the github.com/clarkduvall/hyperloglog implementation.
package hll
import (
"encoding/binary"
"errors"
"fmt"
"math"
"math/bits"
"sort"
"unsafe"
"github.com/cespare/xxhash"
"github.com/influxdata/platform/pkg/estimator"
)
// Current version of HLL implementation.
const version uint8 = 2
// DefaultPrecision is the default precision.
const DefaultPrecision = 16
func beta(ez float64) float64 {
zl := math.Log(ez + 1)
return -0.37331876643753059*ez +
-1.41704077448122989*zl +
0.40729184796612533*math.Pow(zl, 2) +
1.56152033906584164*math.Pow(zl, 3) +
-0.99242233534286128*math.Pow(zl, 4) +
0.26064681399483092*math.Pow(zl, 5) +
-0.03053811369682807*math.Pow(zl, 6) +
0.00155770210179105*math.Pow(zl, 7)
}
// Plus implements the Hyperloglog++ algorithm, described in the following
// paper: http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/40671.pdf
//
// The HyperLogLog++ algorithm provides cardinality estimations.
type Plus struct {
// hash function used to hash values to add to the sketch.
hash func([]byte) uint64
p uint8 // precision.
pp uint8 // p' (sparse) precision to be used when p ∈ [4..pp] and pp < 64.
m uint32 // Number of substream used for stochastic averaging of stream.
mp uint32 // m' (sparse) number of substreams.
alpha float64 // alpha is used for bias correction.
sparse bool // Should we use a sparse sketch representation.
tmpSet set
denseList []uint8 // The dense representation of the HLL.
sparseList *compressedList // values that can be stored in the sparse represenation.
}
// NewPlus returns a new Plus with precision p. p must be between 4 and 18.
func NewPlus(p uint8) (*Plus, error) {
if p > 18 || p < 4 {
return nil, errors.New("precision must be between 4 and 18")
}
// p' = 25 is used in the Google paper.
pp := uint8(25)
hll := &Plus{
hash: xxhash.Sum64,
p: p,
pp: pp,
m: 1 << p,
mp: 1 << pp,
tmpSet: set{},
sparse: true,
}
hll.sparseList = newCompressedList(int(hll.m))
// Determine alpha.
switch hll.m {
case 16:
hll.alpha = 0.673
case 32:
hll.alpha = 0.697
case 64:
hll.alpha = 0.709
default:
hll.alpha = 0.7213 / (1 + 1.079/float64(hll.m))
}
return hll, nil
}
// Bytes estimates the memory footprint of this Plus, in bytes.
func (h *Plus) Bytes() int {
var b int
b += len(h.tmpSet) * 4
b += cap(h.denseList)
if h.sparseList != nil {
b += int(unsafe.Sizeof(*h.sparseList))
b += cap(h.sparseList.b)
}
b += int(unsafe.Sizeof(*h))
return b
}
// NewDefaultPlus creates a new Plus with the default precision.
func NewDefaultPlus() *Plus {
p, err := NewPlus(DefaultPrecision)
if err != nil {
panic(err)
}
return p
}
// Clone returns a deep copy of h.
func (h *Plus) Clone() estimator.Sketch {
var hll = &Plus{
hash: h.hash,
p: h.p,
pp: h.pp,
m: h.m,
mp: h.mp,
alpha: h.alpha,
sparse: h.sparse,
tmpSet: h.tmpSet.Clone(),
sparseList: h.sparseList.Clone(),
}
hll.denseList = make([]uint8, len(h.denseList))
copy(hll.denseList, h.denseList)
return hll
}
// Add adds a new value to the HLL.
func (h *Plus) Add(v []byte) {
x := h.hash(v)
if h.sparse {
h.tmpSet.add(h.encodeHash(x))
if uint32(len(h.tmpSet))*100 > h.m {
h.mergeSparse()
if uint32(h.sparseList.Len()) > h.m {
h.toNormal()
}
}
} else {
i := bextr(x, 64-h.p, h.p) // {x63,...,x64-p}
w := x<<h.p | 1<<(h.p-1) // {x63-p,...,x0}
rho := uint8(bits.LeadingZeros64(w)) + 1
if rho > h.denseList[i] {
h.denseList[i] = rho
}
}
}
// Count returns a cardinality estimate.
func (h *Plus) Count() uint64 {
if h == nil {
return 0 // Nothing to do.
}
if h.sparse {
h.mergeSparse()
return uint64(h.linearCount(h.mp, h.mp-uint32(h.sparseList.count)))
}
sum := 0.0
m := float64(h.m)
var count float64
for _, val := range h.denseList {
sum += 1.0 / float64(uint32(1)<<val)
if val == 0 {
count++
}
}
// Use LogLog-Beta bias estimation
return uint64((h.alpha * m * (m - count) / (beta(count) + sum)) + 0.5)
}
// Merge takes another HyperLogLogPlus and combines it with HyperLogLogPlus h.
// If HyperLogLogPlus h is using the sparse representation, it will be converted
// to the normal representation.
func (h *Plus) Merge(s estimator.Sketch) error {
if s == nil {
// Nothing to do
return nil
}
other, ok := s.(*Plus)
if !ok {
return fmt.Errorf("wrong type for merging: %T", other)
}
if h.p != other.p {
return errors.New("precisions must be equal")
}
if h.sparse {
h.toNormal()
}
if other.sparse {
for k := range other.tmpSet {
i, r := other.decodeHash(k)
if h.denseList[i] < r {
h.denseList[i] = r
}
}
for iter := other.sparseList.Iter(); iter.HasNext(); {
i, r := other.decodeHash(iter.Next())
if h.denseList[i] < r {
h.denseList[i] = r
}
}
} else {
for i, v := range other.denseList {
if v > h.denseList[i] {
h.denseList[i] = v
}
}
}
return nil
}
// MarshalBinary implements the encoding.BinaryMarshaler interface.
func (h *Plus) MarshalBinary() (data []byte, err error) {
if h == nil {
return nil, nil
}
// Marshal a version marker.
data = append(data, version)
// Marshal precision.
data = append(data, byte(h.p))
if h.sparse {
// It's using the sparse representation.
data = append(data, byte(1))
// Add the tmp_set
tsdata, err := h.tmpSet.MarshalBinary()
if err != nil {
return nil, err
}
data = append(data, tsdata...)
// Add the sparse representation
sdata, err := h.sparseList.MarshalBinary()
if err != nil {
return nil, err
}
return append(data, sdata...), nil
}
// It's using the dense representation.
data = append(data, byte(0))
// Add the dense sketch representation.
sz := len(h.denseList)
data = append(data, []byte{
byte(sz >> 24),
byte(sz >> 16),
byte(sz >> 8),
byte(sz),
}...)
// Marshal each element in the list.
for i := 0; i < len(h.denseList); i++ {
data = append(data, byte(h.denseList[i]))
}
return data, nil
}
// UnmarshalBinary implements the encoding.BinaryUnmarshaler interface.
func (h *Plus) UnmarshalBinary(data []byte) error {
if len(data) < 12 {
return fmt.Errorf("provided buffer %v too short for initializing HLL sketch", data)
}
// Unmarshal version. We may need this in the future if we make
// non-compatible changes.
_ = data[0]
// Unmarshal precision.
p := uint8(data[1])
newh, err := NewPlus(p)
if err != nil {
return err
}
*h = *newh
// h is now initialised with the correct precision. We just need to fill the
// rest of the details out.
if data[2] == byte(1) {
// Using the sparse representation.
h.sparse = true
// Unmarshal the tmp_set.
tssz := binary.BigEndian.Uint32(data[3:7])
h.tmpSet = make(map[uint32]struct{}, tssz)
// We need to unmarshal tssz values in total, and each value requires us
// to read 4 bytes.
tsLastByte := int((tssz * 4) + 7)
for i := 7; i < tsLastByte; i += 4 {
k := binary.BigEndian.Uint32(data[i : i+4])
h.tmpSet[k] = struct{}{}
}
// Unmarshal the sparse representation.
return h.sparseList.UnmarshalBinary(data[tsLastByte:])
}
// Using the dense representation.
h.sparse = false
dsz := int(binary.BigEndian.Uint32(data[3:7]))
h.denseList = make([]uint8, 0, dsz)
for i := 7; i < dsz+7; i++ {
h.denseList = append(h.denseList, uint8(data[i]))
}
return nil
}
func (h *Plus) mergeSparse() {
if len(h.tmpSet) == 0 {
return
}
keys := make(uint64Slice, 0, len(h.tmpSet))
for k := range h.tmpSet {
keys = append(keys, k)
}
sort.Sort(keys)
newList := newCompressedList(int(h.m))
for iter, i := h.sparseList.Iter(), 0; iter.HasNext() || i < len(keys); {
if !iter.HasNext() {
newList.Append(keys[i])
i++
continue
}
if i >= len(keys) {
newList.Append(iter.Next())
continue
}
x1, x2 := iter.Peek(), keys[i]
if x1 == x2 {
newList.Append(iter.Next())
i++
} else if x1 > x2 {
newList.Append(x2)
i++
} else {
newList.Append(iter.Next())
}
}
h.sparseList = newList
h.tmpSet = set{}
}
// Convert from sparse representation to dense representation.
func (h *Plus) toNormal() {
if len(h.tmpSet) > 0 {
h.mergeSparse()
}
h.denseList = make([]uint8, h.m)
for iter := h.sparseList.Iter(); iter.HasNext(); {
i, r := h.decodeHash(iter.Next())
if h.denseList[i] < r {
h.denseList[i] = r
}
}
h.sparse = false
h.tmpSet = nil
h.sparseList = nil
}
// Encode a hash to be used in the sparse representation.
func (h *Plus) encodeHash(x uint64) uint32 {
idx := uint32(bextr(x, 64-h.pp, h.pp))
if bextr(x, 64-h.pp, h.pp-h.p) == 0 {
zeros := bits.LeadingZeros64((bextr(x, 0, 64-h.pp)<<h.pp)|(1<<h.pp-1)) + 1
return idx<<7 | uint32(zeros<<1) | 1
}
return idx << 1
}
// Decode a hash from the sparse representation.
func (h *Plus) decodeHash(k uint32) (uint32, uint8) {
var r uint8
if k&1 == 1 {
r = uint8(bextr32(k, 1, 6)) + h.pp - h.p
} else {
r = uint8(bits.LeadingZeros32(k<<(32-h.pp+h.p-1)) + 1)
}
return h.getIndex(k), r
}
func (h *Plus) getIndex(k uint32) uint32 {
if k&1 == 1 {
return bextr32(k, 32-h.p, h.p)
}
return bextr32(k, h.pp-h.p+1, h.p)
}
func (h *Plus) linearCount(m uint32, v uint32) float64 {
fm := float64(m)
return fm * math.Log(fm/float64(v))
}
type uint64Slice []uint32
func (p uint64Slice) Len() int { return len(p) }
func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
type set map[uint32]struct{}
func (s set) Clone() set {
if s == nil {
return nil
}
newS := make(map[uint32]struct{}, len(s))
for k, v := range s {
newS[k] = v
}
return newS
}
func (s set) MarshalBinary() (data []byte, err error) {
// 4 bytes for the size of the set, and 4 bytes for each key.
// list.
data = make([]byte, 0, 4+(4*len(s)))
// Length of the set. We only need 32 bits because the size of the set
// couldn't exceed that on 32 bit architectures.
sl := len(s)
data = append(data, []byte{
byte(sl >> 24),
byte(sl >> 16),
byte(sl >> 8),
byte(sl),
}...)
// Marshal each element in the set.
for k := range s {
data = append(data, []byte{
byte(k >> 24),
byte(k >> 16),
byte(k >> 8),
byte(k),
}...)
}
return data, nil
}
func (s set) add(v uint32) { s[v] = struct{}{} }
func (s set) has(v uint32) bool { _, ok := s[v]; return ok }
// bextr performs a bitfield extract on v. start should be the LSB of the field
// you wish to extract, and length the number of bits to extract.
//
// For example: start=0 and length=4 for the following 64-bit word would result
// in 1111 being returned.
//
// <snip 56 bits>00011110
// returns 1110
func bextr(v uint64, start, length uint8) uint64 {
return (v >> start) & ((1 << length) - 1)
}
func bextr32(v uint32, start, length uint8) uint32 {
return (v >> start) & ((1 << length) - 1)
}

View File

@ -0,0 +1,683 @@
package hll
import (
crand "crypto/rand"
"encoding/binary"
"fmt"
"math"
"math/rand"
"reflect"
"testing"
"unsafe"
"github.com/davecgh/go-spew/spew"
)
func nopHash(buf []byte) uint64 {
if len(buf) != 8 {
panic(fmt.Sprintf("unexpected size buffer: %d", len(buf)))
}
return binary.BigEndian.Uint64(buf)
}
func toByte(v uint64) []byte {
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], v)
return buf[:]
}
func TestPlus_Bytes(t *testing.T) {
testCases := []struct {
p uint8
normal bool
}{
{4, false},
{5, false},
{4, true},
{5, true},
}
for i, testCase := range testCases {
t.Run(fmt.Sprint(i), func(t *testing.T) {
h := NewTestPlus(testCase.p)
plusStructOverhead := int(unsafe.Sizeof(*h))
compressedListOverhead := int(unsafe.Sizeof(*h.sparseList))
var expectedDenseListCapacity, expectedSparseListCapacity int
if testCase.normal {
h.toNormal()
// denseList has capacity for 2^p elements, one byte each
expectedDenseListCapacity = int(math.Pow(2, float64(testCase.p)))
if expectedDenseListCapacity != cap(h.denseList) {
t.Errorf("denseList capacity: want %d got %d", expectedDenseListCapacity, cap(h.denseList))
}
} else {
// sparseList has capacity for 2^p elements, one byte each
expectedSparseListCapacity = int(math.Pow(2, float64(testCase.p)))
if expectedSparseListCapacity != cap(h.sparseList.b) {
t.Errorf("sparseList capacity: want %d got %d", expectedSparseListCapacity, cap(h.sparseList.b))
}
expectedSparseListCapacity += compressedListOverhead
}
expectedSize := plusStructOverhead + expectedDenseListCapacity + expectedSparseListCapacity
if expectedSize != h.Bytes() {
t.Errorf("Bytes(): want %d got %d", expectedSize, h.Bytes())
}
})
}
}
func TestPlus_Add_NoSparse(t *testing.T) {
h := NewTestPlus(16)
h.toNormal()
h.Add(toByte(0x00010fffffffffff))
n := h.denseList[1]
if n != 5 {
t.Error(n)
}
h.Add(toByte(0x0002ffffffffffff))
n = h.denseList[2]
if n != 1 {
t.Error(n)
}
h.Add(toByte(0x0003000000000000))
n = h.denseList[3]
if n != 49 {
t.Error(n)
}
h.Add(toByte(0x0003000000000001))
n = h.denseList[3]
if n != 49 {
t.Error(n)
}
h.Add(toByte(0xff03700000000000))
n = h.denseList[0xff03]
if n != 2 {
t.Error(n)
}
h.Add(toByte(0xff03080000000000))
n = h.denseList[0xff03]
if n != 5 {
t.Error(n)
}
}
func TestPlusPrecision_NoSparse(t *testing.T) {
h := NewTestPlus(4)
h.toNormal()
h.Add(toByte(0x1fffffffffffffff))
n := h.denseList[1]
if n != 1 {
t.Error(n)
}
h.Add(toByte(0xffffffffffffffff))
n = h.denseList[0xf]
if n != 1 {
t.Error(n)
}
h.Add(toByte(0x00ffffffffffffff))
n = h.denseList[0]
if n != 5 {
t.Error(n)
}
}
func TestPlus_toNormal(t *testing.T) {
h := NewTestPlus(16)
h.Add(toByte(0x00010fffffffffff))
h.toNormal()
c := h.Count()
if c != 1 {
t.Error(c)
}
if h.sparse {
t.Error("toNormal should convert to normal")
}
h = NewTestPlus(16)
h.hash = nopHash
h.Add(toByte(0x00010fffffffffff))
h.Add(toByte(0x0002ffffffffffff))
h.Add(toByte(0x0003000000000000))
h.Add(toByte(0x0003000000000001))
h.Add(toByte(0xff03700000000000))
h.Add(toByte(0xff03080000000000))
h.mergeSparse()
h.toNormal()
n := h.denseList[1]
if n != 5 {
t.Error(n)
}
n = h.denseList[2]
if n != 1 {
t.Error(n)
}
n = h.denseList[3]
if n != 49 {
t.Error(n)
}
n = h.denseList[0xff03]
if n != 5 {
t.Error(n)
}
}
func TestPlusCount(t *testing.T) {
h := NewTestPlus(16)
n := h.Count()
if n != 0 {
t.Error(n)
}
h.Add(toByte(0x00010fffffffffff))
h.Add(toByte(0x00020fffffffffff))
h.Add(toByte(0x00030fffffffffff))
h.Add(toByte(0x00040fffffffffff))
h.Add(toByte(0x00050fffffffffff))
h.Add(toByte(0x00050fffffffffff))
n = h.Count()
if n != 5 {
t.Error(n)
}
// not mutated, still returns correct count
n = h.Count()
if n != 5 {
t.Error(n)
}
h.Add(toByte(0x00060fffffffffff))
// mutated
n = h.Count()
if n != 6 {
t.Error(n)
}
}
func TestPlus_Merge_Error(t *testing.T) {
h := NewTestPlus(16)
h2 := NewTestPlus(10)
err := h.Merge(h2)
if err == nil {
t.Error("different precision should return error")
}
}
func TestHLL_Merge_Sparse(t *testing.T) {
h := NewTestPlus(16)
h.Add(toByte(0x00010fffffffffff))
h.Add(toByte(0x00020fffffffffff))
h.Add(toByte(0x00030fffffffffff))
h.Add(toByte(0x00040fffffffffff))
h.Add(toByte(0x00050fffffffffff))
h.Add(toByte(0x00050fffffffffff))
h2 := NewTestPlus(16)
h2.Merge(h)
n := h2.Count()
if n != 5 {
t.Error(n)
}
if h2.sparse {
t.Error("Merge should convert to normal")
}
if !h.sparse {
t.Error("Merge should not modify argument")
}
h2.Merge(h)
n = h2.Count()
if n != 5 {
t.Error(n)
}
h.Add(toByte(0x00060fffffffffff))
h.Add(toByte(0x00070fffffffffff))
h.Add(toByte(0x00080fffffffffff))
h.Add(toByte(0x00090fffffffffff))
h.Add(toByte(0x000a0fffffffffff))
h.Add(toByte(0x000a0fffffffffff))
n = h.Count()
if n != 10 {
t.Error(n)
}
h2.Merge(h)
n = h2.Count()
if n != 10 {
t.Error(n)
}
}
func TestHLL_Merge_Normal(t *testing.T) {
h := NewTestPlus(16)
h.toNormal()
h.Add(toByte(0x00010fffffffffff))
h.Add(toByte(0x00020fffffffffff))
h.Add(toByte(0x00030fffffffffff))
h.Add(toByte(0x00040fffffffffff))
h.Add(toByte(0x00050fffffffffff))
h.Add(toByte(0x00050fffffffffff))
h2 := NewTestPlus(16)
h2.toNormal()
h2.Merge(h)
n := h2.Count()
if n != 5 {
t.Error(n)
}
h2.Merge(h)
n = h2.Count()
if n != 5 {
t.Error(n)
}
h.Add(toByte(0x00060fffffffffff))
h.Add(toByte(0x00070fffffffffff))
h.Add(toByte(0x00080fffffffffff))
h.Add(toByte(0x00090fffffffffff))
h.Add(toByte(0x000a0fffffffffff))
h.Add(toByte(0x000a0fffffffffff))
n = h.Count()
if n != 10 {
t.Error(n)
}
h2.Merge(h)
n = h2.Count()
if n != 10 {
t.Error(n)
}
}
func TestPlus_Merge(t *testing.T) {
h := NewTestPlus(16)
k1 := uint64(0xf000017000000000)
h.Add(toByte(k1))
if !h.tmpSet.has(h.encodeHash(k1)) {
t.Error("key not in hash")
}
k2 := uint64(0x000fff8f00000000)
h.Add(toByte(k2))
if !h.tmpSet.has(h.encodeHash(k2)) {
t.Error("key not in hash")
}
if len(h.tmpSet) != 2 {
t.Error(h.tmpSet)
}
h.mergeSparse()
if len(h.tmpSet) != 0 {
t.Error(h.tmpSet)
}
if h.sparseList.count != 2 {
t.Error(h.sparseList)
}
iter := h.sparseList.Iter()
n := iter.Next()
if n != h.encodeHash(k2) {
t.Error(n)
}
n = iter.Next()
if n != h.encodeHash(k1) {
t.Error(n)
}
k3 := uint64(0x0f00017000000000)
h.Add(toByte(k3))
if !h.tmpSet.has(h.encodeHash(k3)) {
t.Error("key not in hash")
}
h.mergeSparse()
if len(h.tmpSet) != 0 {
t.Error(h.tmpSet)
}
if h.sparseList.count != 3 {
t.Error(h.sparseList)
}
iter = h.sparseList.Iter()
n = iter.Next()
if n != h.encodeHash(k2) {
t.Error(n)
}
n = iter.Next()
if n != h.encodeHash(k3) {
t.Error(n)
}
n = iter.Next()
if n != h.encodeHash(k1) {
t.Error(n)
}
h.Add(toByte(k1))
if !h.tmpSet.has(h.encodeHash(k1)) {
t.Error("key not in hash")
}
h.mergeSparse()
if len(h.tmpSet) != 0 {
t.Error(h.tmpSet)
}
if h.sparseList.count != 3 {
t.Error(h.sparseList)
}
iter = h.sparseList.Iter()
n = iter.Next()
if n != h.encodeHash(k2) {
t.Error(n)
}
n = iter.Next()
if n != h.encodeHash(k3) {
t.Error(n)
}
n = iter.Next()
if n != h.encodeHash(k1) {
t.Error(n)
}
}
func TestPlus_EncodeDecode(t *testing.T) {
h := NewTestPlus(8)
i, r := h.decodeHash(h.encodeHash(0xffffff8000000000))
if i != 0xff {
t.Error(i)
}
if r != 1 {
t.Error(r)
}
i, r = h.decodeHash(h.encodeHash(0xff00000000000000))
if i != 0xff {
t.Error(i)
}
if r != 57 {
t.Error(r)
}
i, r = h.decodeHash(h.encodeHash(0xff30000000000000))
if i != 0xff {
t.Error(i)
}
if r != 3 {
t.Error(r)
}
i, r = h.decodeHash(h.encodeHash(0xaa10000000000000))
if i != 0xaa {
t.Error(i)
}
if r != 4 {
t.Error(r)
}
i, r = h.decodeHash(h.encodeHash(0xaa0f000000000000))
if i != 0xaa {
t.Error(i)
}
if r != 5 {
t.Error(r)
}
}
func TestPlus_Error(t *testing.T) {
_, err := NewPlus(3)
if err == nil {
t.Error("precision 3 should return error")
}
_, err = NewPlus(18)
if err != nil {
t.Error(err)
}
_, err = NewPlus(19)
if err == nil {
t.Error("precision 17 should return error")
}
}
func TestPlus_Marshal_Unmarshal_Sparse(t *testing.T) {
h, _ := NewPlus(4)
h.sparse = true
h.tmpSet = map[uint32]struct{}{26: struct{}{}, 40: struct{}{}}
// Add a bunch of values to the sparse representation.
for i := 0; i < 10; i++ {
h.sparseList.Append(uint32(rand.Int()))
}
data, err := h.MarshalBinary()
if err != nil {
t.Fatal(err)
}
// Peeking at the first byte should reveal the version.
if got, exp := data[0], byte(2); got != exp {
t.Fatalf("got byte %v, expected %v", got, exp)
}
var res Plus
if err := res.UnmarshalBinary(data); err != nil {
t.Fatal(err)
}
// reflect.DeepEqual will always return false when comparing non-nil
// functions, so we'll set them to nil.
h.hash, res.hash = nil, nil
if got, exp := &res, h; !reflect.DeepEqual(got, exp) {
t.Fatalf("got %v, wanted %v", spew.Sdump(got), spew.Sdump(exp))
}
}
func TestPlus_Marshal_Unmarshal_Dense(t *testing.T) {
h, _ := NewPlus(4)
h.sparse = false
// Add a bunch of values to the dense representation.
for i := 0; i < 10; i++ {
h.denseList = append(h.denseList, uint8(rand.Int()))
}
data, err := h.MarshalBinary()
if err != nil {
t.Fatal(err)
}
// Peeking at the first byte should reveal the version.
if got, exp := data[0], byte(2); got != exp {
t.Fatalf("got byte %v, expected %v", got, exp)
}
var res Plus
if err := res.UnmarshalBinary(data); err != nil {
t.Fatal(err)
}
// reflect.DeepEqual will always return false when comparing non-nil
// functions, so we'll set them to nil.
h.hash, res.hash = nil, nil
if got, exp := &res, h; !reflect.DeepEqual(got, exp) {
t.Fatalf("got %v, wanted %v", spew.Sdump(got), spew.Sdump(exp))
}
}
// Tests that a sketch can be serialised / unserialised and keep an accurate
// cardinality estimate.
func TestPlus_Marshal_Unmarshal_Count(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test in short mode")
}
count := make(map[string]struct{}, 1000000)
h, _ := NewPlus(16)
buf := make([]byte, 8)
for i := 0; i < 1000000; i++ {
if _, err := crand.Read(buf); err != nil {
panic(err)
}
count[string(buf)] = struct{}{}
// Add to the sketch.
h.Add(buf)
}
gotC := h.Count()
epsilon := 15000 // 1.5%
if got, exp := math.Abs(float64(int(gotC)-len(count))), epsilon; int(got) > exp {
t.Fatalf("error was %v for estimation %d and true cardinality %d", got, gotC, len(count))
}
// Serialise the sketch.
sketch, err := h.MarshalBinary()
if err != nil {
t.Fatal(err)
}
// Deserialise.
h = &Plus{}
if err := h.UnmarshalBinary(sketch); err != nil {
t.Fatal(err)
}
// The count should be the same
oldC := gotC
if got, exp := h.Count(), oldC; got != exp {
t.Fatalf("got %d, expected %d", got, exp)
}
// Add some more values.
for i := 0; i < 1000000; i++ {
if _, err := crand.Read(buf); err != nil {
panic(err)
}
count[string(buf)] = struct{}{}
// Add to the sketch.
h.Add(buf)
}
// The sketch should still be working correctly.
gotC = h.Count()
epsilon = 30000 // 1.5%
if got, exp := math.Abs(float64(int(gotC)-len(count))), epsilon; int(got) > exp {
t.Fatalf("error was %v for estimation %d and true cardinality %d", got, gotC, len(count))
}
}
func NewTestPlus(p uint8) *Plus {
h, err := NewPlus(p)
if err != nil {
panic(err)
}
h.hash = nopHash
return h
}
// Generate random data to add to the sketch.
func genData(n int) [][]byte {
out := make([][]byte, 0, n)
buf := make([]byte, 8)
for i := 0; i < n; i++ {
// generate 8 random bytes
n, err := rand.Read(buf)
if err != nil {
panic(err)
} else if n != 8 {
panic(fmt.Errorf("only %d bytes generated", n))
}
out = append(out, buf)
}
if len(out) != n {
panic(fmt.Sprintf("wrong size slice: %d", n))
}
return out
}
// Memoises values to be added to a sketch during a benchmark.
var benchdata = map[int][][]byte{}
func benchmarkPlusAdd(b *testing.B, h *Plus, n int) {
blobs, ok := benchdata[n]
if !ok {
// Generate it.
benchdata[n] = genData(n)
blobs = benchdata[n]
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < len(blobs); j++ {
h.Add(blobs[j])
}
}
b.StopTimer()
}
func BenchmarkPlus_Add_100(b *testing.B) {
h, _ := NewPlus(16)
benchmarkPlusAdd(b, h, 100)
}
func BenchmarkPlus_Add_1000(b *testing.B) {
h, _ := NewPlus(16)
benchmarkPlusAdd(b, h, 1000)
}
func BenchmarkPlus_Add_10000(b *testing.B) {
h, _ := NewPlus(16)
benchmarkPlusAdd(b, h, 10000)
}
func BenchmarkPlus_Add_100000(b *testing.B) {
h, _ := NewPlus(16)
benchmarkPlusAdd(b, h, 100000)
}
func BenchmarkPlus_Add_1000000(b *testing.B) {
h, _ := NewPlus(16)
benchmarkPlusAdd(b, h, 1000000)
}
func BenchmarkPlus_Add_10000000(b *testing.B) {
h, _ := NewPlus(16)
benchmarkPlusAdd(b, h, 10000000)
}
func BenchmarkPlus_Add_100000000(b *testing.B) {
h, _ := NewPlus(16)
benchmarkPlusAdd(b, h, 100000000)
}

24
pkg/estimator/sketch.go Normal file
View File

@ -0,0 +1,24 @@
package estimator
import "encoding"
// Sketch is the interface representing a sketch for estimating cardinality.
type Sketch interface {
// Add adds a single value to the sketch.
Add(v []byte)
// Count returns a cardinality estimate for the sketch.
Count() uint64
// Merge merges another sketch into this one.
Merge(s Sketch) error
// Bytes estimates the memory footprint of the sketch, in bytes.
Bytes() int
// Clone returns a deep copy of the sketch.
Clone() Sketch
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
}

46
pkg/limiter/fixed.go Normal file
View File

@ -0,0 +1,46 @@
// Package limiter provides concurrency limiters.
package limiter
// Fixed is a simple channel-based concurrency limiter. It uses a fixed
// size channel to limit callers from proceeding until there is a value available
// in the channel. If all are in-use, the caller blocks until one is freed.
type Fixed chan struct{}
func NewFixed(limit int) Fixed {
return make(Fixed, limit)
}
// Idle returns true if the limiter has all its capacity is available.
func (t Fixed) Idle() bool {
return len(t) == cap(t)
}
// Available returns the number of available tokens that may be taken.
func (t Fixed) Available() int {
return cap(t) - len(t)
}
// Capacity returns the number of tokens can be taken.
func (t Fixed) Capacity() int {
return cap(t)
}
// TryTake attempts to take a token and return true if successful, otherwise returns false.
func (t Fixed) TryTake() bool {
select {
case t <- struct{}{}:
return true
default:
return false
}
}
// Take attempts to take a token and blocks until one is available.
func (t Fixed) Take() {
t <- struct{}{}
}
// Release releases a token back to the limiter.
func (t Fixed) Release() {
<-t
}

26
pkg/limiter/fixed_test.go Normal file
View File

@ -0,0 +1,26 @@
package limiter_test
import (
"testing"
"github.com/influxdata/platform/pkg/limiter"
)
func TestFixed_Available(t *testing.T) {
f := limiter.NewFixed(10)
if exp, got := 10, f.Available(); exp != got {
t.Fatalf("available mismatch: exp %v, got %v", exp, got)
}
f.Take()
if exp, got := 9, f.Available(); exp != got {
t.Fatalf("available mismatch: exp %v, got %v", exp, got)
}
f.Release()
if exp, got := 10, f.Available(); exp != got {
t.Fatalf("available mismatch: exp %v, got %v", exp, got)
}
}

34
pkg/limiter/write_test.go Normal file
View File

@ -0,0 +1,34 @@
package limiter_test
import (
"bytes"
"io"
"testing"
"time"
"github.com/influxdata/platform/pkg/limiter"
)
func TestWriter_Limited(t *testing.T) {
r := bytes.NewReader(bytes.Repeat([]byte{0}, 1024*1024))
limit := 512 * 1024
w := limiter.NewWriter(discardCloser{}, limit, 10*1024*1024)
start := time.Now()
n, err := io.Copy(w, r)
elapsed := time.Since(start)
if err != nil {
t.Error("copy error: ", err)
}
rate := float64(n) / elapsed.Seconds()
if rate > float64(limit) {
t.Errorf("rate limit mismath: exp %f, got %f", float64(limit), rate)
}
}
type discardCloser struct{}
func (d discardCloser) Write(b []byte) (int, error) { return len(b), nil }
func (d discardCloser) Close() error { return nil }

83
pkg/limiter/writer.go Normal file
View File

@ -0,0 +1,83 @@
package limiter
import (
"context"
"io"
"os"
"time"
"golang.org/x/time/rate"
)
type Writer struct {
w io.WriteCloser
limiter Rate
ctx context.Context
}
type Rate interface {
WaitN(ctx context.Context, n int) error
}
func NewRate(bytesPerSec, burstLimit int) Rate {
limiter := rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit)
limiter.AllowN(time.Now(), burstLimit) // spend initial burst
return limiter
}
// NewWriter returns a writer that implements io.Writer with rate limiting.
// The limiter use a token bucket approach and limits the rate to bytesPerSec
// with a maximum burst of burstLimit.
func NewWriter(w io.WriteCloser, bytesPerSec, burstLimit int) *Writer {
limiter := NewRate(bytesPerSec, burstLimit)
return &Writer{
w: w,
ctx: context.Background(),
limiter: limiter,
}
}
// WithRate returns a Writer with the specified rate limiter.
func NewWriterWithRate(w io.WriteCloser, limiter Rate) *Writer {
return &Writer{
w: w,
ctx: context.Background(),
limiter: limiter,
}
}
// Write writes bytes from b.
func (s *Writer) Write(b []byte) (int, error) {
if s.limiter == nil {
return s.w.Write(b)
}
n, err := s.w.Write(b)
if err != nil {
return n, err
}
if err := s.limiter.WaitN(s.ctx, n); err != nil {
return n, err
}
return n, err
}
func (s *Writer) Sync() error {
if f, ok := s.w.(*os.File); ok {
return f.Sync()
}
return nil
}
func (s *Writer) Name() string {
if f, ok := s.w.(*os.File); ok {
return f.Name()
}
return ""
}
func (s *Writer) Close() error {
return s.w.Close()
}

45
pkg/mmap/mmap_solaris.go Normal file
View File

@ -0,0 +1,45 @@
// +build solaris
package mmap
import (
"os"
"syscall"
"golang.org/x/sys/unix"
)
func Map(path string, sz int64) ([]byte, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return nil, err
} else if fi.Size() == 0 {
return nil, nil
}
// Use file size if map size is not passed in.
if sz == 0 {
sz = fi.Size()
}
data, err := unix.Mmap(int(f.Fd()), 0, int(sz), syscall.PROT_READ, syscall.MAP_SHARED)
if err != nil {
return nil, err
}
return data, nil
}
// Unmap closes the memory-map.
func Unmap(data []byte) error {
if data == nil {
return nil
}
return unix.Munmap(data)
}

22
pkg/mmap/mmap_test.go Normal file
View File

@ -0,0 +1,22 @@
package mmap_test
import (
"bytes"
"io/ioutil"
"testing"
"github.com/influxdata/platform/pkg/mmap"
)
func TestMap(t *testing.T) {
data, err := mmap.Map("mmap_test.go", 0)
if err != nil {
t.Fatalf("Open: %v", err)
}
if exp, err := ioutil.ReadFile("mmap_test.go"); err != nil {
t.Fatalf("ioutil.ReadFile: %v", err)
} else if !bytes.Equal(data, exp) {
t.Fatalf("got %q\nwant %q", string(data), string(exp))
}
}

49
pkg/mmap/mmap_unix.go Normal file
View File

@ -0,0 +1,49 @@
// +build darwin dragonfly freebsd linux nacl netbsd openbsd
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package mmap provides a way to memory-map a file.
package mmap
import (
"os"
"syscall"
)
// Map memory-maps a file.
func Map(path string, sz int64) ([]byte, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return nil, err
} else if fi.Size() == 0 {
return nil, nil
}
// Use file size if map size is not passed in.
if sz == 0 {
sz = fi.Size()
}
data, err := syscall.Mmap(int(f.Fd()), 0, int(sz), syscall.PROT_READ, syscall.MAP_SHARED)
if err != nil {
return nil, err
}
return data, nil
}
// Unmap closes the memory-map.
func Unmap(data []byte) error {
if data == nil {
return nil
}
return syscall.Munmap(data)
}

56
pkg/mmap/mmap_windows.go Normal file
View File

@ -0,0 +1,56 @@
package mmap
import (
"os"
"syscall"
"unsafe"
)
// Map memory-maps a file.
func Map(path string, sz int64) ([]byte, error) {
fi, err := os.Stat(path)
if err != nil {
return nil, err
}
// Truncate file to size if too small.
if fi.Size() < sz {
if err := os.Truncate(path, sz); err != nil {
return nil, err
}
} else {
sz = fi.Size()
}
if sz == 0 {
return nil, nil
}
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
lo, hi := uint32(sz), uint32(sz>>32)
fmap, err := syscall.CreateFileMapping(syscall.Handle(f.Fd()), nil, syscall.PAGE_READONLY, hi, lo, nil)
if err != nil {
return nil, err
}
defer syscall.CloseHandle(fmap)
ptr, err := syscall.MapViewOfFile(fmap, syscall.FILE_MAP_READ, 0, 0, uintptr(sz))
if err != nil {
return nil, err
}
data := (*[1 << 30]byte)(unsafe.Pointer(ptr))[:sz]
return data, nil
}
// Unmap closes the memory-map.
func Unmap(data []byte) error {
if data == nil {
return nil
}
return syscall.UnmapViewOfFile(uintptr(unsafe.Pointer(&data[0])))
}

286
pkg/rhh/rhh.go Normal file
View File

@ -0,0 +1,286 @@
package rhh
import (
"bytes"
"encoding/binary"
"sort"
"github.com/cespare/xxhash"
)
// HashMap represents a hash map that implements Robin Hood Hashing.
// https://cs.uwaterloo.ca/research/tr/1986/CS-86-14.pdf
type HashMap struct {
hashes []int64
elems []hashElem
n int64
capacity int64
threshold int64
mask int64
loadFactor int
tmpKey []byte
}
func NewHashMap(opt Options) *HashMap {
m := &HashMap{
capacity: pow2(opt.Capacity), // Limited to 2^64.
loadFactor: opt.LoadFactor,
}
m.alloc()
return m
}
// Reset clears the values in the map without deallocating the space.
func (m *HashMap) Reset() {
for i := int64(0); i < m.capacity; i++ {
m.hashes[i] = 0
m.elems[i].reset()
}
m.n = 0
}
func (m *HashMap) Get(key []byte) interface{} {
i := m.index(key)
if i == -1 {
return nil
}
return m.elems[i].value
}
func (m *HashMap) Put(key []byte, val interface{}) {
// Grow the map if we've run out of slots.
m.n++
if m.n > m.threshold {
m.grow()
}
// If the key was overwritten then decrement the size.
overwritten := m.insert(HashKey(key), key, val)
if overwritten {
m.n--
}
}
func (m *HashMap) insert(hash int64, key []byte, val interface{}) (overwritten bool) {
pos := hash & m.mask
var dist int64
var copied bool
searchKey := key
// Continue searching until we find an empty slot or lower probe distance.
for {
e := &m.elems[pos]
// Empty slot found or matching key, insert and exit.
match := bytes.Equal(m.elems[pos].key, searchKey)
if m.hashes[pos] == 0 || match {
m.hashes[pos] = hash
e.hash, e.value = hash, val
e.setKey(searchKey)
return match
}
// If the existing elem has probed less than us, then swap places with
// existing elem, and keep going to find another slot for that elem.
elemDist := Dist(m.hashes[pos], pos, m.capacity)
if elemDist < dist {
// Swap with current position.
hash, m.hashes[pos] = m.hashes[pos], hash
val, e.value = e.value, val
m.tmpKey = assign(m.tmpKey, e.key)
e.setKey(searchKey)
if !copied {
searchKey = make([]byte, len(key))
copy(searchKey, key)
copied = true
}
searchKey = assign(searchKey, m.tmpKey)
// Update current distance.
dist = elemDist
}
// Increment position, wrap around on overflow.
pos = (pos + 1) & m.mask
dist++
}
}
// alloc elems according to currently set capacity.
func (m *HashMap) alloc() {
m.elems = make([]hashElem, m.capacity)
m.hashes = make([]int64, m.capacity)
m.threshold = (m.capacity * int64(m.loadFactor)) / 100
m.mask = int64(m.capacity - 1)
}
// grow doubles the capacity and reinserts all existing hashes & elements.
func (m *HashMap) grow() {
// Copy old elements and hashes.
elems, hashes := m.elems, m.hashes
capacity := m.capacity
// Double capacity & reallocate.
m.capacity *= 2
m.alloc()
// Copy old elements to new hash/elem list.
for i := int64(0); i < capacity; i++ {
elem, hash := &elems[i], hashes[i]
if hash == 0 {
continue
}
m.insert(hash, elem.key, elem.value)
}
}
// index returns the position of key in the hash map.
func (m *HashMap) index(key []byte) int64 {
hash := HashKey(key)
pos := hash & m.mask
var dist int64
for {
if m.hashes[pos] == 0 {
return -1
} else if dist > Dist(m.hashes[pos], pos, m.capacity) {
return -1
} else if m.hashes[pos] == hash && bytes.Equal(m.elems[pos].key, key) {
return pos
}
pos = (pos + 1) & m.mask
dist++
}
}
// Elem returns the i-th key/value pair of the hash map.
func (m *HashMap) Elem(i int64) (key []byte, value interface{}) {
if i >= int64(len(m.elems)) {
return nil, nil
}
e := &m.elems[i]
return e.key, e.value
}
// Len returns the number of key/values set in map.
func (m *HashMap) Len() int64 { return m.n }
// Cap returns the number of key/values set in map.
func (m *HashMap) Cap() int64 { return m.capacity }
// AverageProbeCount returns the average number of probes for each element.
func (m *HashMap) AverageProbeCount() float64 {
var sum float64
for i := int64(0); i < m.capacity; i++ {
hash := m.hashes[i]
if hash == 0 {
continue
}
sum += float64(Dist(hash, i, m.capacity))
}
return sum/float64(m.n) + 1.0
}
// Keys returns a list of sorted keys.
func (m *HashMap) Keys() [][]byte {
a := make([][]byte, 0, m.Len())
for i := int64(0); i < m.Cap(); i++ {
k, v := m.Elem(i)
if v == nil {
continue
}
a = append(a, k)
}
sort.Sort(byteSlices(a))
return a
}
type hashElem struct {
key []byte
value interface{}
hash int64
}
// reset clears the values in the element.
func (e *hashElem) reset() {
e.key = e.key[:0]
e.value = nil
e.hash = 0
}
// setKey copies v to a key on e.
func (e *hashElem) setKey(v []byte) {
e.key = assign(e.key, v)
}
// Options represents initialization options that are passed to NewHashMap().
type Options struct {
Capacity int64
LoadFactor int
}
// DefaultOptions represents a default set of options to pass to NewHashMap().
var DefaultOptions = Options{
Capacity: 256,
LoadFactor: 90,
}
// HashKey computes a hash of key. Hash is always non-zero.
func HashKey(key []byte) int64 {
h := int64(xxhash.Sum64(key))
if h == 0 {
h = 1
} else if h < 0 {
h = 0 - h
}
return h
}
// HashUint64 computes a hash of an int64. Hash is always non-zero.
func HashUint64(key uint64) int64 {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, key)
return HashKey(buf)
}
// Dist returns the probe distance for a hash in a slot index.
// NOTE: Capacity must be a power of 2.
func Dist(hash, i, capacity int64) int64 {
mask := capacity - 1
dist := (i + capacity - (hash & mask)) & mask
return dist
}
// pow2 returns the number that is the next highest power of 2.
// Returns v if it is a power of 2.
func pow2(v int64) int64 {
for i := int64(2); i < 1<<62; i *= 2 {
if i >= v {
return i
}
}
panic("unreachable")
}
func assign(x, v []byte) []byte {
if cap(x) < len(v) {
x = make([]byte, len(v))
}
x = x[:len(v)]
copy(x, v)
return x
}
type byteSlices [][]byte
func (a byteSlices) Len() int { return len(a) }
func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

78
pkg/rhh/rhh_test.go Normal file
View File

@ -0,0 +1,78 @@
package rhh_test
import (
"bytes"
"math/rand"
"reflect"
"testing"
"testing/quick"
"github.com/influxdata/platform/pkg/rhh"
)
// Ensure hash map can perform basic get/put operations.
func TestHashMap(t *testing.T) {
m := rhh.NewHashMap(rhh.DefaultOptions)
m.Put([]byte("foo"), []byte("bar"))
m.Put([]byte("baz"), []byte("bat"))
// Verify values can be retrieved.
if v := m.Get([]byte("foo")); !bytes.Equal(v.([]byte), []byte("bar")) {
t.Fatalf("unexpected value: %s", v)
}
if v := m.Get([]byte("baz")); !bytes.Equal(v.([]byte), []byte("bat")) {
t.Fatalf("unexpected value: %s", v)
}
// Overwrite field & verify.
m.Put([]byte("foo"), []byte("XXX"))
if v := m.Get([]byte("foo")); !bytes.Equal(v.([]byte), []byte("XXX")) {
t.Fatalf("unexpected value: %s", v)
}
}
// Ensure hash map can insert random data.
func TestHashMap_Quick(t *testing.T) {
if testing.Short() {
t.Skip("short mode, skipping")
}
if err := quick.Check(func(keys, values [][]byte) bool {
m := rhh.NewHashMap(rhh.Options{Capacity: 1000, LoadFactor: 90})
h := make(map[string][]byte)
// Insert all key/values into both maps.
for i := range keys {
key, value := keys[i], values[i]
h[string(key)] = value
m.Put(key, value)
}
// Verify the maps are equal.
for k, v := range h {
if mv := m.Get([]byte(k)); !bytes.Equal(mv.([]byte), v) {
t.Fatalf("value mismatch:\nkey=%x\ngot=%x\nexp=%x\n\n", []byte(k), mv, v)
}
}
return true
}, &quick.Config{
Values: func(values []reflect.Value, rand *rand.Rand) {
n := rand.Intn(10000)
values[0] = GenerateByteSlices(rand, n)
values[1] = GenerateByteSlices(rand, n)
},
}); err != nil {
t.Fatal(err)
}
}
// GenerateByteSlices returns a random list of byte slices.
func GenerateByteSlices(rand *rand.Rand, n int) reflect.Value {
var a [][]byte
for i := 0; i < n; i++ {
v, _ := quick.Value(reflect.TypeOf(([]byte)(nil)), rand)
a = append(a, v.Interface().([]byte))
}
return reflect.ValueOf(a)
}

38
pkg/snowflake/README.md Normal file
View File

@ -0,0 +1,38 @@
Snowflake ID generator
======================
This is a Go implementation of [Twitter Snowflake](https://blog.twitter.com/2010/announcing-snowflake).
The most useful aspect of these IDs is they are _roughly_ sortable and when generated
at roughly the same time, should have values in close proximity to each other.
IDs
---
Each id will be a 64-bit number represented, structured as follows:
```
6 6 5 4 3 2 1
3210987654321098765432109876543210987654321098765432109876543210
ttttttttttttttttttttttttttttttttttttttttttmmmmmmmmmmssssssssssss
```
where
* s (sequence) is a 12-bit integer that increments if called multiple times for the same millisecond
* m (machine id) is a 10-bit integer representing the server id
* t (time) is a 42-bit integer representing the current timestamp in milliseconds
the number of milliseconds to have elapsed since 1491696000000 or 2017-04-09T00:00:00Z
### String Encoding
The 64-bit unsigned integer is base-63 encoded using the following URL-safe characters, which are ordered
according to their ASCII value.
```
0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz~
```
A binary sort of a list of encoded values will be correctly ordered according to the numerical representation.

124
pkg/snowflake/gen.go Normal file
View File

@ -0,0 +1,124 @@
package snowflake
import (
"fmt"
"sync/atomic"
"time"
)
const (
epoch = 1491696000000
serverBits = 10
sequenceBits = 12
timeBits = 42
serverShift = sequenceBits
timeShift = sequenceBits + serverBits
serverMax = ^(-1 << serverBits)
sequenceMask = ^(-1 << sequenceBits)
timeMask = ^(-1 << timeBits)
)
type Generator struct {
state uint64
machine uint64
}
func New(machineID int) *Generator {
if machineID < 0 || machineID > serverMax {
panic(fmt.Errorf("invalid machine id; must be 0 ≤ id < %d", serverMax))
}
return &Generator{
state: 0,
machine: uint64(machineID << serverShift),
}
}
func (g *Generator) MachineID() int {
return int(g.machine >> serverShift)
}
func (g *Generator) Next() uint64 {
var state uint64
// we attempt 100 times to update the millisecond part of the state
// and increment the sequence atomically. each attempt is approx ~30ns
// so we spend around ~3µs total.
for i := 0; i < 100; i++ {
t := (now() - epoch) & timeMask
current := atomic.LoadUint64(&g.state)
currentTime := current >> timeShift & timeMask
currentSeq := current & sequenceMask
// this sequence of conditionals ensures a monotonically increasing
// state.
switch {
// if our time is in the future, use that with a zero sequence number.
case t > currentTime:
state = t << timeShift
// we now know that our time is at or before the current time.
// if we're at the maximum sequence, bump to the next millisecond
case currentSeq == sequenceMask:
state = (currentTime + 1) << timeShift
// otherwise, increment the sequence.
default:
state = current + 1
}
if atomic.CompareAndSwapUint64(&g.state, current, state) {
break
}
state = 0
}
// since we failed 100 times, there's high contention. bail out of the
// loop to bound the time we'll spend in this method, and just add
// one to the counter. this can cause millisecond drift, but hopefully
// some CAS eventually succeeds and fixes the milliseconds. additionally,
// if the sequence is already at the maximum, adding 1 here can cause
// it to roll over into the machine id. giving the CAS 100 attempts
// helps to avoid these problems.
if state == 0 {
state = atomic.AddUint64(&g.state, 1)
}
return state | g.machine
}
func (g *Generator) NextString() string {
var s [11]byte
encode(&s, g.Next())
return string(s[:])
}
func (g *Generator) AppendNext(s *[11]byte) {
encode(s, g.Next())
}
func now() uint64 { return uint64(time.Now().UnixNano() / 1e6) }
var digits = [...]byte{
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J',
'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T',
'U', 'V', 'W', 'X', 'Y', 'Z', '_', 'a', 'b', 'c',
'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w',
'x', 'y', 'z', '~'}
func encode(s *[11]byte, n uint64) {
s[10], n = digits[n&0x3f], n>>6
s[9], n = digits[n&0x3f], n>>6
s[8], n = digits[n&0x3f], n>>6
s[7], n = digits[n&0x3f], n>>6
s[6], n = digits[n&0x3f], n>>6
s[5], n = digits[n&0x3f], n>>6
s[4], n = digits[n&0x3f], n>>6
s[3], n = digits[n&0x3f], n>>6
s[2], n = digits[n&0x3f], n>>6
s[1], n = digits[n&0x3f], n>>6
s[0] = digits[n&0x3f]
}

118
pkg/snowflake/gen_test.go Normal file
View File

@ -0,0 +1,118 @@
package snowflake
import (
"fmt"
"math/rand"
"reflect"
"sort"
"sync/atomic"
"testing"
)
func TestEncode(t *testing.T) {
tests := []struct {
v uint64
exp string
}{
{0x000, "00000000000"},
{0x001, "00000000001"},
{0x03f, "0000000000~"},
{0x07f, "0000000001~"},
{0xf07f07f07f07f07f, "F1~1~1~1~1~"},
}
for _, test := range tests {
t.Run(fmt.Sprintf("0x%03x→%s", test.v, test.exp), func(t *testing.T) {
var s [11]byte
encode(&s, test.v)
if got, exp := string(s[:]), test.exp; got != exp {
t.Fatalf("got %q, expected %q", got, exp)
}
})
}
}
// TestSorting verifies numbers using base 63 encoding are ordered according to their numerical representation.
func TestSorting(t *testing.T) {
var (
vals = make([]string, 1000)
exp = make([]string, 1000)
)
for i := 0; i < len(vals); i++ {
var s [11]byte
encode(&s, uint64(i*47))
vals[i] = string(s[:])
exp[i] = string(s[:])
}
// randomize them
shuffle(len(vals), func(i, j int) {
vals[i], vals[j] = vals[j], vals[i]
})
sort.Strings(vals)
if !reflect.DeepEqual(vals, exp) {
t.Fatalf("got %v, expected %v", vals, exp)
}
}
func TestMachineID(t *testing.T) {
for i := 0; i < serverMax; i++ {
if got, exp := New(i).MachineID(), i; got != exp {
t.Fatalf("got %d, expected %d", got, exp)
}
}
}
func TestNextMonotonic(t *testing.T) {
g := New(10)
out := make([]string, 10000)
for i := range out {
out[i] = g.NextString()
}
// ensure they are all distinct and increasing
for i := range out[1:] {
if out[i] >= out[i+1] {
t.Fatal("bad entries:", out[i], out[i+1])
}
}
}
func BenchmarkEncode(b *testing.B) {
b.ReportAllocs()
var s [11]byte
for i := 0; i < b.N; i++ {
encode(&s, 100)
}
}
var blackhole uint64 // to make sure the g.Next calls are not removed
func BenchmarkNext(b *testing.B) {
g := New(10)
for i := 0; i < b.N; i++ {
blackhole += g.Next()
}
}
func BenchmarkNextParallel(b *testing.B) {
g := New(1)
b.RunParallel(func(pb *testing.PB) {
var lblackhole uint64
for pb.Next() {
lblackhole += g.Next()
}
atomic.AddUint64(&blackhole, lblackhole)
})
}
func shuffle(n int, swap func(i, j int)) {
for i := n - 1; i > 0; i-- {
j := rand.Intn(i + 1)
swap(i, j)
}
}

View File

@ -7,8 +7,8 @@ import (
"sync"
"time"
"github.com/influxdata/influxdb/pkg/snowflake"
"github.com/influxdata/platform"
"github.com/influxdata/platform/pkg/snowflake"
)
func init() {

View File

@ -11,11 +11,11 @@ import (
"sort"
"time"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/pkg/estimator"
"github.com/influxdata/platform/pkg/limiter"
"go.uber.org/zap"
)

View File

@ -7,7 +7,7 @@ import (
"regexp"
"sort"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/platform/pkg/estimator"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/models"

View File

@ -5,8 +5,8 @@ import (
"sync/atomic"
"unsafe"
"github.com/influxdata/influxdb/pkg/bytesutil"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/pkg/bytesutil"
)
// SeriesCollection is a struct of arrays representation of a collection of series that allows

View File

@ -10,8 +10,8 @@ import (
"sync"
"github.com/cespare/xxhash"
"github.com/influxdata/influxdb/pkg/binaryutil"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/pkg/binaryutil"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

View File

@ -7,8 +7,8 @@ import (
"io"
"os"
"github.com/influxdata/influxdb/pkg/mmap"
"github.com/influxdata/influxdb/pkg/rhh"
"github.com/influxdata/platform/pkg/mmap"
"github.com/influxdata/platform/pkg/rhh"
)
const (

View File

@ -10,8 +10,8 @@ import (
"sync"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/pkg/rhh"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/pkg/rhh"
"go.uber.org/zap"
)

View File

@ -10,7 +10,7 @@ import (
"os"
"strconv"
"github.com/influxdata/influxdb/pkg/mmap"
"github.com/influxdata/platform/pkg/mmap"
)
const (